Use async_grpc (#1013)

Use async_grpc
master
Christoph Schütte 2018-03-23 10:52:48 +01:00 committed by GitHub
parent ab1c99ab34
commit 915ebb19a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
80 changed files with 296 additions and 2843 deletions

View File

@ -34,6 +34,10 @@ find_package(Eigen3 REQUIRED)
find_package(LuaGoogle REQUIRED)
find_package(Protobuf 3.0.0 REQUIRED)
if (${BUILD_GRPC})
find_package(async_grpc REQUIRED)
endif()
include(FindPkgConfig)
PKG_SEARCH_MODULE(CAIRO REQUIRED cairo>=1.12.16)
@ -172,6 +176,7 @@ if(${BUILD_GRPC})
cartographer/cloud/map_builder_server_main.cc
)
target_link_libraries(cartographer_grpc_server PUBLIC grpc++)
target_link_libraries(cartographer_grpc_server PUBLIC async_grpc)
if(${BUILD_PROMETHEUS})
target_link_libraries(cartographer_grpc_server PUBLIC prometheus-cpp)
endif()
@ -186,6 +191,7 @@ foreach(ABS_FIL ${ALL_TESTS})
google_test("${TEST_TARGET_NAME}" ${ABS_FIL})
if(${BUILD_GRPC})
target_link_libraries("${TEST_TARGET_NAME}" PUBLIC grpc++)
target_link_libraries("${TEST_TARGET_NAME}" PUBLIC async_grpc)
endif()
if(${BUILD_PROMETHEUS})
target_link_libraries(${PROJECT_NAME} PUBLIC prometheus-cpp)
@ -223,6 +229,7 @@ target_include_directories(${PROJECT_NAME} SYSTEM PUBLIC
target_link_libraries(${PROJECT_NAME} PUBLIC ${PROTOBUF_LIBRARY} pthread)
if(${BUILD_GRPC})
target_link_libraries(${PROJECT_NAME} PUBLIC grpc++)
target_link_libraries(${PROJECT_NAME} PUBLIC async_grpc)
endif()
if(${BUILD_PROMETHEUS})
target_link_libraries(${PROJECT_NAME} PUBLIC prometheus-cpp)

View File

@ -21,6 +21,8 @@ COPY scripts/install_proto3.sh cartographer/scripts/
RUN cartographer/scripts/install_proto3.sh && rm -rf protobuf
COPY scripts/install_grpc.sh cartographer/scripts/
RUN cartographer/scripts/install_grpc.sh && rm -rf grpc
COPY scripts/install_async_grpc.sh cartographer/scripts/
RUN cartographer/scripts/install_async_grpc.sh && rm -rf async_grpc
COPY scripts/install_prometheus_cpp.sh cartographer/scripts/
RUN cartographer/scripts/install_prometheus_cpp.sh && rm -rf prometheus-cpp
COPY . cartographer

View File

@ -226,6 +226,14 @@ def cartographer_repositories():
],
)
_maybe(native.http_archive,
name = "com_github_googlecartographer_async_grpc",
strip_prefix = "async_grpc-654c75ebf553c2bdb624c87a690f5a238aeb651f",
urls = [
"https://github.com/googlecartographer/async_grpc/archive/654c75ebf553c2bdb624c87a690f5a238aeb651f.tar.gz",
],
)
# TODO(rodrigoq): remove these binds once grpc#14140 has been merged, as well
# as removing `use_external` in cartographer_grpc/BUILD.bazel.
# https://github.com/grpc/grpc/pull/14140

View File

@ -70,6 +70,7 @@ cc_library(
":cc_protos",
"@boost//:iostreams",
"@com_google_glog//:glog",
"@com_github_googlecartographer_async_grpc//async_grpc:async_grpc",
"@org_cairographics_cairo//:cairo",
"@org_ceres_solver_ceres_solver//:ceres",
"@org_lua_lua//:lua",

View File

@ -49,8 +49,8 @@ int MapBuilderStub::AddTrajectoryBuilder(
for (const auto& sensor_id : expected_sensor_ids) {
*request.add_expected_sensor_ids() = cloud::ToProto(sensor_id);
}
framework::Client<handlers::AddTrajectoryHandler> client(
client_channel_, framework::CreateLimitedBackoffStrategy(
async_grpc::Client<handlers::AddTrajectorySignature> client(
client_channel_, async_grpc::CreateLimitedBackoffStrategy(
common::FromMilliseconds(100), 2.f, 5));
CHECK(client.Write(request));
@ -78,7 +78,8 @@ mapping::TrajectoryBuilderInterface* MapBuilderStub::GetTrajectoryBuilder(
void MapBuilderStub::FinishTrajectory(int trajectory_id) {
proto::FinishTrajectoryRequest request;
request.set_trajectory_id(trajectory_id);
framework::Client<handlers::FinishTrajectoryHandler> client(client_channel_);
async_grpc::Client<handlers::FinishTrajectorySignature> client(
client_channel_);
CHECK(client.Write(request));
trajectory_builder_stubs_.erase(trajectory_id);
}
@ -88,7 +89,7 @@ std::string MapBuilderStub::SubmapToProto(
mapping::proto::SubmapQuery::Response* submap_query_response) {
proto::GetSubmapRequest request;
submap_id.ToProto(request.mutable_submap_id());
framework::Client<handlers::GetSubmapHandler> client(client_channel_);
async_grpc::Client<handlers::GetSubmapSignature> client(client_channel_);
CHECK(client.Write(request));
submap_query_response->CopyFrom(client.response().submap_query_response());
return client.response().error_msg();
@ -96,10 +97,10 @@ std::string MapBuilderStub::SubmapToProto(
void MapBuilderStub::SerializeState(io::ProtoStreamWriterInterface* writer) {
google::protobuf::Empty request;
framework::Client<handlers::WriteStateHandler> client(client_channel_);
async_grpc::Client<handlers::WriteStateSignature> client(client_channel_);
CHECK(client.Write(request));
proto::WriteStateResponse response;
while (client.Read(&response)) {
while (client.StreamRead(&response)) {
switch (response.state_chunk_case()) {
case proto::WriteStateResponse::kPoseGraph:
writer->WriteProto(response.pose_graph());
@ -121,7 +122,7 @@ void MapBuilderStub::LoadState(io::ProtoStreamReaderInterface* reader,
if (!load_frozen_state) {
LOG(FATAL) << "Not implemented";
}
framework::Client<handlers::LoadStateHandler> client(client_channel_);
async_grpc::Client<handlers::LoadStateSignature> client(client_channel_);
// Request with a PoseGraph proto is sent first.
{
proto::LoadStateRequest request;
@ -141,8 +142,8 @@ void MapBuilderStub::LoadState(io::ProtoStreamReaderInterface* reader,
}
CHECK(reader->eof());
CHECK(client.WritesDone());
CHECK(client.Finish().ok());
CHECK(client.StreamWritesDone());
CHECK(client.StreamFinish().ok());
}
int MapBuilderStub::num_trajectory_builders() const {

View File

@ -15,7 +15,7 @@
*/
#include "cartographer/cloud/internal/client/pose_graph_stub.h"
#include "cartographer/cloud/internal/framework/client.h"
#include "async_grpc/client.h"
#include "cartographer/cloud/internal/handlers/get_all_submap_poses.h"
#include "cartographer/cloud/internal/handlers/get_constraints_handler.h"
#include "cartographer/cloud/internal/handlers/get_landmark_poses_handler.h"
@ -34,7 +34,7 @@ PoseGraphStub::PoseGraphStub(std::shared_ptr<::grpc::Channel> client_channel)
void PoseGraphStub::RunFinalOptimization() {
google::protobuf::Empty request;
framework::Client<handlers::RunFinalOptimizationHandler> client(
async_grpc::Client<handlers::RunFinalOptimizationSignature> client(
client_channel_);
CHECK(client.Write(request));
}
@ -47,7 +47,8 @@ PoseGraphStub::GetAllSubmapData() {
mapping::MapById<mapping::SubmapId, mapping::PoseGraphInterface::SubmapPose>
PoseGraphStub::GetAllSubmapPoses() {
google::protobuf::Empty request;
framework::Client<handlers::GetAllSubmapPosesHandler> client(client_channel_);
async_grpc::Client<handlers::GetAllSubmapPosesSignature> client(
client_channel_);
CHECK(client.Write(request));
mapping::MapById<mapping::SubmapId, mapping::PoseGraphInterface::SubmapPose>
submap_poses;
@ -65,7 +66,7 @@ PoseGraphStub::GetAllSubmapPoses() {
transform::Rigid3d PoseGraphStub::GetLocalToGlobalTransform(int trajectory_id) {
proto::GetLocalToGlobalTransformRequest request;
request.set_trajectory_id(trajectory_id);
framework::Client<handlers::GetLocalToGlobalTransformHandler> client(
async_grpc::Client<handlers::GetLocalToGlobalTransformSignature> client(
client_channel_);
CHECK(client.Write(request));
return transform::ToRigid3(client.response().local_to_global());
@ -79,7 +80,7 @@ PoseGraphStub::GetTrajectoryNodes() {
mapping::MapById<mapping::NodeId, mapping::TrajectoryNodePose>
PoseGraphStub::GetTrajectoryNodePoses() {
google::protobuf::Empty request;
framework::Client<handlers::GetTrajectoryNodePosesHandler> client(
async_grpc::Client<handlers::GetTrajectoryNodePosesSignature> client(
client_channel_);
CHECK(client.Write(request));
mapping::MapById<mapping::NodeId, mapping::TrajectoryNodePose> node_poses;
@ -95,7 +96,8 @@ PoseGraphStub::GetTrajectoryNodePoses() {
std::map<std::string, transform::Rigid3d> PoseGraphStub::GetLandmarkPoses() {
google::protobuf::Empty request;
framework::Client<handlers::GetLandmarkPosesHandler> client(client_channel_);
async_grpc::Client<handlers::GetLandmarkPosesSignature> client(
client_channel_);
CHECK(client.Write(request));
std::map<std::string, transform::Rigid3d> landmark_poses;
for (const auto& landmark_pose : client.response().landmark_poses()) {
@ -121,7 +123,7 @@ PoseGraphStub::GetTrajectoryData() {
std::vector<mapping::PoseGraphInterface::Constraint>
PoseGraphStub::constraints() {
google::protobuf::Empty request;
framework::Client<handlers::GetConstraintsHandler> client(client_channel_);
async_grpc::Client<handlers::GetConstraintsSignature> client(client_channel_);
CHECK(client.Write(request));
return mapping::FromProto(client.response().constraints());
}

View File

@ -50,24 +50,24 @@ TrajectoryBuilderStub::~TrajectoryBuilderStub() {
receive_local_slam_results_thread_->join();
}
if (add_rangefinder_client_) {
CHECK(add_rangefinder_client_->WritesDone());
CHECK(add_rangefinder_client_->Finish().ok());
CHECK(add_rangefinder_client_->StreamWritesDone());
CHECK(add_rangefinder_client_->StreamFinish().ok());
}
if (add_imu_client_) {
CHECK(add_imu_client_->WritesDone());
CHECK(add_imu_client_->Finish().ok());
CHECK(add_imu_client_->StreamWritesDone());
CHECK(add_imu_client_->StreamFinish().ok());
}
if (add_odometry_client_) {
CHECK(add_odometry_client_->WritesDone());
CHECK(add_odometry_client_->Finish().ok());
CHECK(add_odometry_client_->StreamWritesDone());
CHECK(add_odometry_client_->StreamFinish().ok());
}
if (add_landmark_client_) {
CHECK(add_landmark_client_->WritesDone());
CHECK(add_landmark_client_->Finish().ok());
CHECK(add_landmark_client_->StreamWritesDone());
CHECK(add_landmark_client_->StreamFinish().ok());
}
if (add_fixed_frame_pose_client_) {
CHECK(add_fixed_frame_pose_client_->WritesDone());
CHECK(add_fixed_frame_pose_client_->Finish().ok());
CHECK(add_fixed_frame_pose_client_->StreamWritesDone());
CHECK(add_fixed_frame_pose_client_->StreamFinish().ok());
}
}
@ -76,7 +76,7 @@ void TrajectoryBuilderStub::AddSensorData(
const sensor::TimedPointCloudData& timed_point_cloud_data) {
if (!add_rangefinder_client_) {
add_rangefinder_client_ = common::make_unique<
framework::Client<handlers::AddRangefinderDataHandler>>(
async_grpc::Client<handlers::AddRangefinderDataSignature>>(
client_channel_);
}
proto::AddRangefinderDataRequest request;
@ -90,7 +90,7 @@ void TrajectoryBuilderStub::AddSensorData(const std::string& sensor_id,
const sensor::ImuData& imu_data) {
if (!add_imu_client_) {
add_imu_client_ =
common::make_unique<framework::Client<handlers::AddImuDataHandler>>(
common::make_unique<async_grpc::Client<handlers::AddImuDataSignature>>(
client_channel_);
}
proto::AddImuDataRequest request;
@ -103,7 +103,8 @@ void TrajectoryBuilderStub::AddSensorData(
const std::string& sensor_id, const sensor::OdometryData& odometry_data) {
if (!add_odometry_client_) {
add_odometry_client_ = common::make_unique<
framework::Client<handlers::AddOdometryDataHandler>>(client_channel_);
async_grpc::Client<handlers::AddOdometryDataSignature>>(
client_channel_);
}
proto::AddOdometryDataRequest request;
CreateAddOdometryDataRequest(sensor_id, trajectory_id_,
@ -116,7 +117,7 @@ void TrajectoryBuilderStub::AddSensorData(
const sensor::FixedFramePoseData& fixed_frame_pose) {
if (!add_fixed_frame_pose_client_) {
add_fixed_frame_pose_client_ = common::make_unique<
framework::Client<handlers::AddFixedFramePoseDataHandler>>(
async_grpc::Client<handlers::AddFixedFramePoseDataSignature>>(
client_channel_);
}
proto::AddFixedFramePoseDataRequest request;
@ -129,7 +130,8 @@ void TrajectoryBuilderStub::AddSensorData(
const std::string& sensor_id, const sensor::LandmarkData& landmark_data) {
if (!add_landmark_client_) {
add_landmark_client_ = common::make_unique<
framework::Client<handlers::AddLandmarkDataHandler>>(client_channel_);
async_grpc::Client<handlers::AddLandmarkDataSignature>>(
client_channel_);
}
proto::AddLandmarkDataRequest request;
CreateAddLandmarkDataRequest(sensor_id, trajectory_id_,
@ -143,10 +145,10 @@ void TrajectoryBuilderStub::AddLocalSlamResultData(
}
void TrajectoryBuilderStub::RunLocalSlamResultsReader(
framework::Client<handlers::ReceiveLocalSlamResultsHandler>* client,
async_grpc::Client<handlers::ReceiveLocalSlamResultsSignature>* client,
LocalSlamResultCallback local_slam_result_callback) {
proto::ReceiveLocalSlamResultsResponse response;
while (client->Read(&response)) {
while (client->StreamRead(&response)) {
int trajectory_id = response.trajectory_id();
common::Time time = common::FromUniversal(response.timestamp());
transform::Rigid3d local_pose = transform::ToRigid3(response.local_pose());
@ -161,7 +163,7 @@ void TrajectoryBuilderStub::RunLocalSlamResultsReader(
local_slam_result_callback(trajectory_id, time, local_pose, range_data,
std::move(insertion_result));
}
client->Finish();
client->StreamFinish();
}
} // namespace cloud

View File

@ -19,7 +19,7 @@
#include <thread>
#include "cartographer/cloud/internal/framework/client.h"
#include "async_grpc/client.h"
#include "cartographer/cloud/internal/handlers/add_fixed_frame_pose_data_handler.h"
#include "cartographer/cloud/internal/handlers/add_imu_data_handler.h"
#include "cartographer/cloud/internal/handlers/add_landmark_data_handler.h"
@ -62,23 +62,23 @@ class TrajectoryBuilderStub : public mapping::TrajectoryBuilderInterface {
private:
static void RunLocalSlamResultsReader(
framework::Client<handlers::ReceiveLocalSlamResultsHandler>*
async_grpc::Client<handlers::ReceiveLocalSlamResultsSignature>*
client_reader,
LocalSlamResultCallback local_slam_result_callback);
std::shared_ptr<::grpc::Channel> client_channel_;
const int trajectory_id_;
std::unique_ptr<framework::Client<handlers::AddRangefinderDataHandler>>
std::unique_ptr<async_grpc::Client<handlers::AddRangefinderDataSignature>>
add_rangefinder_client_;
std::unique_ptr<framework::Client<handlers::AddImuDataHandler>>
std::unique_ptr<async_grpc::Client<handlers::AddImuDataSignature>>
add_imu_client_;
std::unique_ptr<framework::Client<handlers::AddOdometryDataHandler>>
std::unique_ptr<async_grpc::Client<handlers::AddOdometryDataSignature>>
add_odometry_client_;
std::unique_ptr<framework::Client<handlers::AddFixedFramePoseDataHandler>>
std::unique_ptr<async_grpc::Client<handlers::AddFixedFramePoseDataSignature>>
add_fixed_frame_pose_client_;
std::unique_ptr<framework::Client<handlers::AddLandmarkDataHandler>>
std::unique_ptr<async_grpc::Client<handlers::AddLandmarkDataSignature>>
add_landmark_client_;
framework::Client<handlers::ReceiveLocalSlamResultsHandler>
async_grpc::Client<handlers::ReceiveLocalSlamResultsSignature>
receive_local_slam_results_client_;
std::unique_ptr<std::thread> receive_local_slam_results_thread_;
};

View File

@ -1,205 +0,0 @@
/*
* Copyright 2018 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_CLIENT_H
#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_CLIENT_H
#include "cartographer/cloud/internal/framework/retry.h"
#include "cartographer/cloud/internal/framework/rpc_handler_interface.h"
#include "cartographer/cloud/internal/framework/type_traits.h"
#include "glog/logging.h"
#include "grpc++/grpc++.h"
#include "grpc++/impl/codegen/client_unary_call.h"
#include "grpc++/impl/codegen/sync_stream.h"
namespace cartographer {
namespace cloud {
namespace framework {
template <typename RpcHandlerType>
class Client {
public:
Client(std::shared_ptr<::grpc::Channel> channel, RetryStrategy retry_strategy)
: channel_(channel),
client_context_(common::make_unique<::grpc::ClientContext>()),
rpc_method_name_(
RpcHandlerInterface::Instantiate<RpcHandlerType>()->method_name()),
rpc_method_(rpc_method_name_.c_str(),
RpcType<typename RpcHandlerType::IncomingType,
typename RpcHandlerType::OutgoingType>::value,
channel_),
retry_strategy_(retry_strategy) {
CHECK(!retry_strategy ||
rpc_method_.method_type() == ::grpc::internal::RpcMethod::NORMAL_RPC)
<< "Retry is currently only support for NORMAL_RPC.";
}
Client(std::shared_ptr<::grpc::Channel> channel)
: channel_(channel),
client_context_(common::make_unique<::grpc::ClientContext>()),
rpc_method_name_(
RpcHandlerInterface::Instantiate<RpcHandlerType>()->method_name()),
rpc_method_(rpc_method_name_.c_str(),
RpcType<typename RpcHandlerType::IncomingType,
typename RpcHandlerType::OutgoingType>::value,
channel_) {}
bool Read(typename RpcHandlerType::ResponseType *response) {
switch (rpc_method_.method_type()) {
case ::grpc::internal::RpcMethod::BIDI_STREAMING:
InstantiateClientReaderWriterIfNeeded();
return client_reader_writer_->Read(response);
case ::grpc::internal::RpcMethod::SERVER_STREAMING:
CHECK(client_reader_);
return client_reader_->Read(response);
default:
LOG(FATAL) << "Not implemented.";
}
}
bool Write(const typename RpcHandlerType::RequestType &request) {
return RetryWithStrategy(
retry_strategy_,
std::bind(&Client<RpcHandlerType>::WriteImpl, this, request),
std::bind(&Client<RpcHandlerType>::Reset, this));
}
bool WritesDone() {
switch (rpc_method_.method_type()) {
case ::grpc::internal::RpcMethod::CLIENT_STREAMING:
InstantiateClientWriterIfNeeded();
return client_writer_->WritesDone();
case ::grpc::internal::RpcMethod::BIDI_STREAMING:
InstantiateClientReaderWriterIfNeeded();
return client_reader_writer_->WritesDone();
default:
LOG(FATAL) << "Not implemented.";
}
}
::grpc::Status Finish() {
switch (rpc_method_.method_type()) {
case ::grpc::internal::RpcMethod::CLIENT_STREAMING:
InstantiateClientWriterIfNeeded();
return client_writer_->Finish();
case ::grpc::internal::RpcMethod::BIDI_STREAMING:
InstantiateClientReaderWriterIfNeeded();
return client_reader_writer_->Finish();
case ::grpc::internal::RpcMethod::SERVER_STREAMING:
CHECK(client_reader_);
return client_reader_->Finish();
default:
LOG(FATAL) << "Not implemented.";
}
}
const typename RpcHandlerType::ResponseType &response() {
CHECK(rpc_method_.method_type() ==
::grpc::internal::RpcMethod::NORMAL_RPC ||
rpc_method_.method_type() ==
::grpc::internal::RpcMethod::CLIENT_STREAMING);
return response_;
}
private:
void Reset() {
client_context_ = common::make_unique<::grpc::ClientContext>();
}
bool WriteImpl(const typename RpcHandlerType::RequestType &request) {
switch (rpc_method_.method_type()) {
case ::grpc::internal::RpcMethod::NORMAL_RPC:
return MakeBlockingUnaryCall(request, &response_).ok();
case ::grpc::internal::RpcMethod::CLIENT_STREAMING:
InstantiateClientWriterIfNeeded();
return client_writer_->Write(request);
case ::grpc::internal::RpcMethod::BIDI_STREAMING:
InstantiateClientReaderWriterIfNeeded();
return client_reader_writer_->Write(request);
case ::grpc::internal::RpcMethod::SERVER_STREAMING:
InstantiateClientReader(request);
return true;
}
LOG(FATAL) << "Not reached.";
}
void InstantiateClientWriterIfNeeded() {
CHECK_EQ(rpc_method_.method_type(),
::grpc::internal::RpcMethod::CLIENT_STREAMING);
if (!client_writer_) {
client_writer_.reset(
::grpc::internal::
ClientWriterFactory<typename RpcHandlerType::RequestType>::Create(
channel_.get(), rpc_method_, client_context_.get(),
&response_));
}
}
void InstantiateClientReaderWriterIfNeeded() {
CHECK_EQ(rpc_method_.method_type(),
::grpc::internal::RpcMethod::BIDI_STREAMING);
if (!client_reader_writer_) {
client_reader_writer_.reset(
::grpc::internal::ClientReaderWriterFactory<
typename RpcHandlerType::RequestType,
typename RpcHandlerType::ResponseType>::Create(channel_.get(),
rpc_method_,
client_context_
.get()));
}
}
void InstantiateClientReader(
const typename RpcHandlerType::RequestType &request) {
CHECK_EQ(rpc_method_.method_type(),
::grpc::internal::RpcMethod::SERVER_STREAMING);
client_reader_.reset(
::grpc::internal::
ClientReaderFactory<typename RpcHandlerType::ResponseType>::Create(
channel_.get(), rpc_method_, client_context_.get(), request));
}
::grpc::Status MakeBlockingUnaryCall(
const typename RpcHandlerType::RequestType &request,
typename RpcHandlerType::ResponseType *response) {
CHECK_EQ(rpc_method_.method_type(),
::grpc::internal::RpcMethod::NORMAL_RPC);
return ::grpc::internal::BlockingUnaryCall(
channel_.get(), rpc_method_, client_context_.get(), request, response);
}
std::shared_ptr<::grpc::Channel> channel_;
std::unique_ptr<::grpc::ClientContext> client_context_;
const std::string rpc_method_name_;
const ::grpc::internal::RpcMethod rpc_method_;
std::unique_ptr<::grpc::ClientWriter<typename RpcHandlerType::RequestType>>
client_writer_;
std::unique_ptr<
::grpc::ClientReaderWriter<typename RpcHandlerType::RequestType,
typename RpcHandlerType::ResponseType>>
client_reader_writer_;
std::unique_ptr<::grpc::ClientReader<typename RpcHandlerType::ResponseType>>
client_reader_;
typename RpcHandlerType::ResponseType response_;
RetryStrategy retry_strategy_;
};
} // namespace framework
} // namespace cloud
} // namespace cartographer
#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_CLIENT_H

View File

@ -1,48 +0,0 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cartographer/cloud/internal/framework/completion_queue_thread.h"
#include "cartographer/common/make_unique.h"
#include "glog/logging.h"
namespace cartographer {
namespace cloud {
namespace framework {
CompletionQueueThread::CompletionQueueThread(
std::unique_ptr<::grpc::ServerCompletionQueue> completion_queue)
: completion_queue_(std::move(completion_queue)) {}
::grpc::ServerCompletionQueue* CompletionQueueThread::completion_queue() {
return completion_queue_.get();
}
void CompletionQueueThread::Start(CompletionQueueRunner runner) {
CHECK(!worker_thread_);
worker_thread_ = common::make_unique<std::thread>(
[this, runner]() { runner(this->completion_queue_.get()); });
}
void CompletionQueueThread::Shutdown() {
LOG(INFO) << "Shutting down completion queue " << completion_queue_.get();
completion_queue_->Shutdown();
worker_thread_->join();
}
} // namespace framework
} // namespace cloud
} // namespace cartographer

View File

@ -1,50 +0,0 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CARTOGRAPHER_INTERNAL_FRAMEWORK_COMPLETION_QUEUE_THREAD_H_
#define CARTOGRAPHER_INTERNAL_FRAMEWORK_COMPLETION_QUEUE_THREAD_H_
#include <grpc++/grpc++.h>
#include <memory>
#include <thread>
namespace cartographer {
namespace cloud {
namespace framework {
class CompletionQueueThread {
public:
using CompletionQueueRunner =
std::function<void(::grpc::ServerCompletionQueue*)>;
explicit CompletionQueueThread(
std::unique_ptr<::grpc::ServerCompletionQueue> completion_queue);
::grpc::ServerCompletionQueue* completion_queue();
void Start(CompletionQueueRunner runner);
void Shutdown();
private:
std::unique_ptr<::grpc::ServerCompletionQueue> completion_queue_;
std::unique_ptr<std::thread> worker_thread_;
};
} // namespace framework
} // namespace cloud
} // namespace cartographer
#endif // CARTOGRAPHER_INTERNAL_FRAMEWORK_COMPLETION_QUEUE_THREAD_H_

View File

@ -1,46 +0,0 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cartographer/cloud/internal/framework/event_queue_thread.h"
#include "cartographer/common/make_unique.h"
#include "glog/logging.h"
namespace cartographer {
namespace cloud {
namespace framework {
EventQueueThread::EventQueueThread() {
event_queue_ = common::make_unique<EventQueue>();
}
EventQueue* EventQueueThread::event_queue() { return event_queue_.get(); }
void EventQueueThread::Start(EventQueueRunner runner) {
CHECK(!thread_);
EventQueue* event_queue = event_queue_.get();
thread_ = common::make_unique<std::thread>(
[event_queue, runner]() { runner(event_queue); });
}
void EventQueueThread::Shutdown() {
LOG(INFO) << "Shutting down event queue " << event_queue_.get();
thread_->join();
}
} // namespace framework
} // namespace cloud
} // namespace cartographer

View File

@ -1,50 +0,0 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_EVENT_QUEUE_THREAD_H
#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_EVENT_QUEUE_THREAD_H
#include <memory>
#include <thread>
#include "cartographer/cloud/internal/framework/rpc.h"
#include "cartographer/common/blocking_queue.h"
namespace cartographer {
namespace cloud {
namespace framework {
class EventQueueThread {
public:
using EventQueueRunner = std::function<void(EventQueue*)>;
EventQueueThread();
EventQueue* event_queue();
void Start(EventQueueRunner runner);
void Shutdown();
private:
std::unique_ptr<EventQueue> event_queue_;
std::unique_ptr<std::thread> thread_;
};
} // namespace framework
} // namespace cloud
} // namespace cartographer
#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_EVENT_QUEUE_THREAD_H

View File

@ -1,66 +0,0 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_EXECUTION_CONTEXT_H
#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_EXECUTION_CONTEXT_H
#include "cartographer/common/mutex.h"
#include "glog/logging.h"
namespace cartographer {
namespace cloud {
namespace framework {
// Implementations of this class allow RPC handlers to share state among one
// another. Using Server::SetExecutionContext(...) a server-wide
// 'ExecutionContext' can be specified. This 'ExecutionContext' can be retrieved
// by all implementations of 'RpcHandler' by calling
// 'RpcHandler::GetContext<MyContext>()'.
class ExecutionContext {
public:
// Automatically locks an ExecutionContext for shared use by RPC handlers.
// This non-movable, non-copyable class is used to broker access from various
// RPC handlers to the shared 'ExecutionContext'.
template <typename ContextType>
class Synchronized {
public:
ContextType* operator->() {
return static_cast<ContextType*>(execution_context_);
}
Synchronized(common::Mutex* lock, ExecutionContext* execution_context)
: locker_(lock), execution_context_(execution_context) {}
Synchronized(const Synchronized&) = delete;
Synchronized(Synchronized&&) = delete;
private:
common::MutexLocker locker_;
ExecutionContext* execution_context_;
};
ExecutionContext() = default;
virtual ~ExecutionContext() = default;
ExecutionContext(const ExecutionContext&) = delete;
ExecutionContext& operator=(const ExecutionContext&) = delete;
common::Mutex* lock() { return &lock_; }
private:
common::Mutex lock_;
};
} // namespace framework
} // namespace cloud
} // namespace cartographer
#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_EXECUTION_CONTEXT_H

View File

@ -1,58 +0,0 @@
// Copyright 2017 The Cartographer Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package cartographer.cloud.framework.proto;
message GetSumRequest {
int32 input = 1;
}
message GetSumResponse {
int32 output = 1;
}
message GetSquareRequest {
int32 input = 1;
}
message GetSquareResponse {
int32 output = 1;
}
message GetEchoRequest {
int32 input = 1;
}
message GetEchoResponse {
int32 output = 1;
}
message GetSequenceRequest {
int32 input = 1;
}
message GetSequenceResponse {
int32 output = 1;
}
// Provides information about the gRPC server.
service Math {
rpc GetSum(stream GetSumRequest) returns (GetSumResponse);
rpc GetSquare(GetSquareRequest) returns (GetSquareResponse);
rpc GetRunningSum(stream GetSumRequest) returns (stream GetSumResponse);
rpc GetEcho(GetEchoRequest) returns (GetEchoResponse);
rpc GetSequence(GetSequenceRequest) returns (stream GetSequenceResponse);
}

View File

@ -1,92 +0,0 @@
/*
* Copyright 2018 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <chrono>
#include <thread>
#include "cartographer/cloud/internal/framework/retry.h"
#include "glog/logging.h"
namespace cartographer {
namespace cloud {
namespace framework {
RetryStrategy CreateRetryStrategy(RetryIndicator retry_indicator,
RetryDelayCalculator retry_delay_calculator) {
return [retry_indicator, retry_delay_calculator](int failed_attempts) {
if (!retry_indicator(failed_attempts)) {
return optional<Duration>();
}
return optional<Duration>(retry_delay_calculator(failed_attempts));
};
}
RetryIndicator CreateLimitedRetryIndicator(int max_attempts) {
return [max_attempts](int failed_attempts) {
return failed_attempts < max_attempts;
};
}
RetryDelayCalculator CreateBackoffDelayCalculator(Duration min_delay,
float backoff_factor) {
return [min_delay, backoff_factor](int failed_attempts) -> Duration {
CHECK_GE(failed_attempts, 0);
using common::FromSeconds;
using common::ToSeconds;
return FromSeconds(std::pow(backoff_factor, failed_attempts - 1) *
ToSeconds(min_delay));
};
}
RetryStrategy CreateLimitedBackoffStrategy(Duration min_delay,
float backoff_factor,
int max_attempts) {
return CreateRetryStrategy(
CreateLimitedRetryIndicator(max_attempts),
CreateBackoffDelayCalculator(min_delay, backoff_factor));
}
bool RetryWithStrategy(RetryStrategy retry_strategy, std::function<bool()> op,
std::function<void()> reset) {
optional<Duration> delay;
int failed_attemps = 0;
for (;;) {
if (op()) {
return true;
}
if (!retry_strategy) {
return false;
}
delay = retry_strategy(++failed_attemps);
if (!delay.has_value()) {
break;
}
LOG(INFO) << "Retrying after "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
delay.value())
.count()
<< " milliseconds.";
std::this_thread::sleep_for(delay.value());
if (reset) {
reset();
}
}
return false;
}
} // namespace framework
} // namespace cloud
} // namespace cartographer

View File

@ -1,53 +0,0 @@
/*
* Copyright 2018 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RETRY_H
#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RETRY_H
#include "cartographer/common/optional.h"
#include "cartographer/common/time.h"
#include "grpc++/grpc++.h"
namespace cartographer {
namespace cloud {
namespace framework {
using common::Duration;
using common::optional;
using RetryStrategy =
std::function<optional<Duration>(int /* failed_attempts */)>;
using RetryIndicator = std::function<bool(int /* failed_attempts */)>;
using RetryDelayCalculator = std::function<Duration(int /* failed_attempts */)>;
RetryStrategy CreateRetryStrategy(RetryIndicator retry_indicator,
RetryDelayCalculator retry_delay_calculator);
RetryIndicator CreateLimitedRetryIndicator(int max_attempts);
RetryDelayCalculator CreateBackoffDelayCalculator(Duration min_delay,
float backoff_factor);
RetryStrategy CreateLimitedBackoffStrategy(Duration min_delay,
float backoff_factor,
int max_attempts);
bool RetryWithStrategy(RetryStrategy retry_strategy, std::function<bool()> op,
std::function<void()> reset = nullptr);
} // namespace framework
} // namespace cloud
} // namespace cartographer
#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RETRY_H

View File

@ -1,383 +0,0 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cartographer/cloud/internal/framework/rpc.h"
#include "cartographer/cloud/internal/framework/service.h"
#include "cartographer/common/make_unique.h"
#include "glog/logging.h"
namespace cartographer {
namespace cloud {
namespace framework {
namespace {
// Finishes the gRPC for non-streaming response RPCs, i.e. NORMAL_RPC and
// CLIENT_STREAMING. If no 'msg' is passed, we signal an error to the client as
// the server is not honoring the gRPC call signature.
template <typename ReaderWriter>
void SendUnaryFinish(ReaderWriter* reader_writer, ::grpc::Status status,
const google::protobuf::Message* msg,
Rpc::EventBase* rpc_event) {
if (msg) {
reader_writer->Finish(*msg, status, rpc_event);
} else {
reader_writer->FinishWithError(status, rpc_event);
}
}
} // namespace
void Rpc::CompletionQueueRpcEvent::Handle() {
pending = false;
rpc_ptr->service()->HandleEvent(event, rpc_ptr, ok);
}
void Rpc::InternalRpcEvent::Handle() {
if (auto rpc_shared = rpc.lock()) {
rpc_shared->service()->HandleEvent(event, rpc_shared.get(), true);
} else {
LOG(WARNING) << "Ignoring stale event.";
}
}
Rpc::Rpc(int method_index,
::grpc::ServerCompletionQueue* server_completion_queue,
EventQueue* event_queue, ExecutionContext* execution_context,
const RpcHandlerInfo& rpc_handler_info, Service* service,
WeakPtrFactory weak_ptr_factory)
: method_index_(method_index),
server_completion_queue_(server_completion_queue),
event_queue_(event_queue),
execution_context_(execution_context),
rpc_handler_info_(rpc_handler_info),
service_(service),
weak_ptr_factory_(weak_ptr_factory),
new_connection_event_(Event::NEW_CONNECTION, this),
read_event_(Event::READ, this),
write_event_(Event::WRITE, this),
finish_event_(Event::FINISH, this),
done_event_(Event::DONE, this),
handler_(rpc_handler_info_.rpc_handler_factory(this, execution_context)) {
InitializeReadersAndWriters(rpc_handler_info_.rpc_type);
// Initialize the prototypical request and response messages.
request_.reset(::google::protobuf::MessageFactory::generated_factory()
->GetPrototype(rpc_handler_info_.request_descriptor)
->New());
response_.reset(::google::protobuf::MessageFactory::generated_factory()
->GetPrototype(rpc_handler_info_.response_descriptor)
->New());
}
std::unique_ptr<Rpc> Rpc::Clone() {
return common::make_unique<Rpc>(
method_index_, server_completion_queue_, event_queue_, execution_context_,
rpc_handler_info_, service_, weak_ptr_factory_);
}
void Rpc::OnRequest() { handler_->OnRequestInternal(request_.get()); }
void Rpc::OnReadsDone() { handler_->OnReadsDone(); }
void Rpc::OnFinish() { handler_->OnFinish(); }
void Rpc::RequestNextMethodInvocation() {
// Ask gRPC to notify us when the connection terminates.
SetRpcEventState(Event::DONE, true);
// TODO(gaschler): Asan reports direct leak of this new from both calls
// StartServing and HandleNewConnection.
server_context_.AsyncNotifyWhenDone(GetRpcEvent(Event::DONE));
// Make sure after terminating the connection, gRPC notifies us with this
// event.
SetRpcEventState(Event::NEW_CONNECTION, true);
switch (rpc_handler_info_.rpc_type) {
case ::grpc::internal::RpcMethod::BIDI_STREAMING:
service_->RequestAsyncBidiStreaming(
method_index_, &server_context_, streaming_interface(),
server_completion_queue_, server_completion_queue_,
GetRpcEvent(Event::NEW_CONNECTION));
break;
case ::grpc::internal::RpcMethod::CLIENT_STREAMING:
service_->RequestAsyncClientStreaming(
method_index_, &server_context_, streaming_interface(),
server_completion_queue_, server_completion_queue_,
GetRpcEvent(Event::NEW_CONNECTION));
break;
case ::grpc::internal::RpcMethod::NORMAL_RPC:
service_->RequestAsyncUnary(
method_index_, &server_context_, request_.get(),
streaming_interface(), server_completion_queue_,
server_completion_queue_, GetRpcEvent(Event::NEW_CONNECTION));
break;
case ::grpc::internal::RpcMethod::SERVER_STREAMING:
service_->RequestAsyncServerStreaming(
method_index_, &server_context_, request_.get(),
streaming_interface(), server_completion_queue_,
server_completion_queue_, GetRpcEvent(Event::NEW_CONNECTION));
break;
}
}
void Rpc::RequestStreamingReadIfNeeded() {
// For request-streaming RPCs ask the client to start sending requests.
switch (rpc_handler_info_.rpc_type) {
case ::grpc::internal::RpcMethod::BIDI_STREAMING:
case ::grpc::internal::RpcMethod::CLIENT_STREAMING:
SetRpcEventState(Event::READ, true);
async_reader_interface()->Read(request_.get(), GetRpcEvent(Event::READ));
break;
case ::grpc::internal::RpcMethod::NORMAL_RPC:
case ::grpc::internal::RpcMethod::SERVER_STREAMING:
// For NORMAL_RPC and SERVER_STREAMING we don't need to queue an event,
// since gRPC automatically issues a READ request and places the request
// into the 'Message' we provided to 'RequestAsyncUnary' above.
OnRequest();
OnReadsDone();
break;
}
}
void Rpc::Write(std::unique_ptr<::google::protobuf::Message> message) {
EnqueueMessage(SendItem{std::move(message), ::grpc::Status::OK});
event_queue_->Push(UniqueEventPtr(
new InternalRpcEvent(Event::WRITE_NEEDED, weak_ptr_factory_(this))));
}
void Rpc::Finish(::grpc::Status status) {
EnqueueMessage(SendItem{nullptr /* message */, status});
event_queue_->Push(UniqueEventPtr(
new InternalRpcEvent(Event::WRITE_NEEDED, weak_ptr_factory_(this))));
}
void Rpc::HandleSendQueue() {
SendItem send_item;
{
common::MutexLocker locker(&send_queue_lock_);
if (send_queue_.empty() || IsRpcEventPending(Event::WRITE) ||
IsRpcEventPending(Event::FINISH)) {
return;
}
send_item = std::move(send_queue_.front());
send_queue_.pop();
}
if (!send_item.msg ||
rpc_handler_info_.rpc_type == ::grpc::internal::RpcMethod::NORMAL_RPC ||
rpc_handler_info_.rpc_type ==
::grpc::internal::RpcMethod::CLIENT_STREAMING) {
PerformFinish(std::move(send_item.msg), send_item.status);
return;
}
PerformWrite(std::move(send_item.msg), send_item.status);
}
::grpc::internal::ServerAsyncStreamingInterface* Rpc::streaming_interface() {
switch (rpc_handler_info_.rpc_type) {
case ::grpc::internal::RpcMethod::BIDI_STREAMING:
return server_async_reader_writer_.get();
case ::grpc::internal::RpcMethod::CLIENT_STREAMING:
return server_async_reader_.get();
case ::grpc::internal::RpcMethod::NORMAL_RPC:
return server_async_response_writer_.get();
case ::grpc::internal::RpcMethod::SERVER_STREAMING:
return server_async_writer_.get();
}
LOG(FATAL) << "Never reached.";
}
::grpc::internal::AsyncReaderInterface<::google::protobuf::Message>*
Rpc::async_reader_interface() {
switch (rpc_handler_info_.rpc_type) {
case ::grpc::internal::RpcMethod::BIDI_STREAMING:
return server_async_reader_writer_.get();
case ::grpc::internal::RpcMethod::CLIENT_STREAMING:
return server_async_reader_.get();
case ::grpc::internal::RpcMethod::NORMAL_RPC:
LOG(FATAL) << "For NORMAL_RPC no streaming reader interface exists.";
case ::grpc::internal::RpcMethod::SERVER_STREAMING:
LOG(FATAL)
<< "For SERVER_STREAMING no streaming reader interface exists.";
}
LOG(FATAL) << "Never reached.";
}
::grpc::internal::AsyncWriterInterface<::google::protobuf::Message>*
Rpc::async_writer_interface() {
switch (rpc_handler_info_.rpc_type) {
case ::grpc::internal::RpcMethod::BIDI_STREAMING:
return server_async_reader_writer_.get();
case ::grpc::internal::RpcMethod::CLIENT_STREAMING:
case ::grpc::internal::RpcMethod::NORMAL_RPC:
LOG(FATAL) << "For NORMAL_RPC and CLIENT_STREAMING no streaming writer "
"interface exists.";
break;
case ::grpc::internal::RpcMethod::SERVER_STREAMING:
return server_async_writer_.get();
}
LOG(FATAL) << "Never reached.";
}
Rpc::CompletionQueueRpcEvent* Rpc::GetRpcEvent(Event event) {
switch (event) {
case Event::NEW_CONNECTION:
return &new_connection_event_;
case Event::READ:
return &read_event_;
case Event::WRITE_NEEDED:
LOG(FATAL) << "Rpc does not store Event::WRITE_NEEDED.";
break;
case Event::WRITE:
return &write_event_;
case Event::FINISH:
return &finish_event_;
case Event::DONE:
return &done_event_;
}
LOG(FATAL) << "Never reached.";
}
bool* Rpc::GetRpcEventState(Event event) {
return &GetRpcEvent(event)->pending;
}
void Rpc::EnqueueMessage(SendItem&& send_item) {
common::MutexLocker locker(&send_queue_lock_);
send_queue_.emplace(std::move(send_item));
}
void Rpc::PerformFinish(std::unique_ptr<::google::protobuf::Message> message,
::grpc::Status status) {
SetRpcEventState(Event::FINISH, true);
switch (rpc_handler_info_.rpc_type) {
case ::grpc::internal::RpcMethod::BIDI_STREAMING:
CHECK(!message);
server_async_reader_writer_->Finish(status, GetRpcEvent(Event::FINISH));
break;
case ::grpc::internal::RpcMethod::CLIENT_STREAMING:
response_ = std::move(message);
SendUnaryFinish(server_async_reader_.get(), status, response_.get(),
GetRpcEvent(Event::FINISH));
break;
case ::grpc::internal::RpcMethod::NORMAL_RPC:
response_ = std::move(message);
SendUnaryFinish(server_async_response_writer_.get(), status,
response_.get(), GetRpcEvent(Event::FINISH));
break;
case ::grpc::internal::RpcMethod::SERVER_STREAMING:
CHECK(!message);
server_async_writer_->Finish(status, GetRpcEvent(Event::FINISH));
break;
}
}
void Rpc::PerformWrite(std::unique_ptr<::google::protobuf::Message> message,
::grpc::Status status) {
CHECK(message) << "PerformWrite must be called with a non-null message";
CHECK_NE(rpc_handler_info_.rpc_type, ::grpc::internal::RpcMethod::NORMAL_RPC);
CHECK_NE(rpc_handler_info_.rpc_type,
::grpc::internal::RpcMethod::CLIENT_STREAMING);
SetRpcEventState(Event::WRITE, true);
response_ = std::move(message);
async_writer_interface()->Write(*response_, GetRpcEvent(Event::WRITE));
}
void Rpc::SetRpcEventState(Event event, bool pending) {
// TODO(gaschler): Since the only usage is setting this true at creation,
// consider removing this method.
*GetRpcEventState(event) = pending;
}
bool Rpc::IsRpcEventPending(Event event) { return *GetRpcEventState(event); }
bool Rpc::IsAnyEventPending() {
return IsRpcEventPending(Rpc::Event::DONE) ||
IsRpcEventPending(Rpc::Event::READ) ||
IsRpcEventPending(Rpc::Event::WRITE) ||
IsRpcEventPending(Rpc::Event::FINISH);
}
std::weak_ptr<Rpc> Rpc::GetWeakPtr() { return weak_ptr_factory_(this); }
ActiveRpcs::ActiveRpcs() : lock_() {}
void Rpc::InitializeReadersAndWriters(
::grpc::internal::RpcMethod::RpcType rpc_type) {
switch (rpc_type) {
case ::grpc::internal::RpcMethod::BIDI_STREAMING:
server_async_reader_writer_ =
common::make_unique<::grpc::ServerAsyncReaderWriter<
google::protobuf::Message, google::protobuf::Message>>(
&server_context_);
break;
case ::grpc::internal::RpcMethod::CLIENT_STREAMING:
server_async_reader_ = common::make_unique<::grpc::ServerAsyncReader<
google::protobuf::Message, google::protobuf::Message>>(
&server_context_);
break;
case ::grpc::internal::RpcMethod::NORMAL_RPC:
server_async_response_writer_ = common::make_unique<
::grpc::ServerAsyncResponseWriter<google::protobuf::Message>>(
&server_context_);
break;
case ::grpc::internal::RpcMethod::SERVER_STREAMING:
server_async_writer_ = common::make_unique<
::grpc::ServerAsyncWriter<google::protobuf::Message>>(
&server_context_);
break;
}
}
ActiveRpcs::~ActiveRpcs() {
common::MutexLocker locker(&lock_);
if (!rpcs_.empty()) {
LOG(FATAL) << "RPCs still in flight!";
}
}
std::shared_ptr<Rpc> ActiveRpcs::Add(std::unique_ptr<Rpc> rpc) {
common::MutexLocker locker(&lock_);
std::shared_ptr<Rpc> shared_ptr_rpc = std::move(rpc);
const auto result = rpcs_.emplace(shared_ptr_rpc.get(), shared_ptr_rpc);
CHECK(result.second) << "RPC already active.";
return shared_ptr_rpc;
}
bool ActiveRpcs::Remove(Rpc* rpc) {
common::MutexLocker locker(&lock_);
auto it = rpcs_.find(rpc);
if (it != rpcs_.end()) {
rpcs_.erase(it);
return true;
}
return false;
}
Rpc::WeakPtrFactory ActiveRpcs::GetWeakPtrFactory() {
return [this](Rpc* rpc) { return GetWeakPtr(rpc); };
}
std::weak_ptr<Rpc> ActiveRpcs::GetWeakPtr(Rpc* rpc) {
common::MutexLocker locker(&lock_);
auto it = rpcs_.find(rpc);
CHECK(it != rpcs_.end());
return it->second;
}
} // namespace framework
} // namespace cloud
} // namespace cartographer

View File

@ -1,213 +0,0 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RPC_H
#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RPC_H
#include <memory>
#include <queue>
#include <unordered_set>
#include "cartographer/cloud/internal/framework/execution_context.h"
#include "cartographer/cloud/internal/framework/rpc_handler_interface.h"
#include "cartographer/common/blocking_queue.h"
#include "cartographer/common/mutex.h"
#include "google/protobuf/message.h"
#include "grpc++/grpc++.h"
#include "grpc++/impl/codegen/async_stream.h"
#include "grpc++/impl/codegen/async_unary_call.h"
#include "grpc++/impl/codegen/proto_utils.h"
#include "grpc++/impl/codegen/service_type.h"
namespace cartographer {
namespace cloud {
namespace framework {
class Service;
// TODO(cschuet): Add a unittest that tests the logic of this class.
class Rpc {
public:
using WeakPtrFactory = std::function<std::weak_ptr<Rpc>(Rpc*)>;
enum class Event {
NEW_CONNECTION = 0,
READ,
WRITE_NEEDED,
WRITE,
FINISH,
DONE
};
struct EventBase {
explicit EventBase(Event event) : event(event) {}
virtual ~EventBase(){};
virtual void Handle() = 0;
const Event event;
};
class EventDeleter {
public:
enum Action { DELETE = 0, DO_NOT_DELETE };
// The default action 'DELETE' is used implicitly, for instance for a
// new UniqueEventPtr or a UniqueEventPtr that is created by
// 'return nullptr'.
EventDeleter() : action_(DELETE) {}
explicit EventDeleter(Action action) : action_(action) {}
void operator()(EventBase* e) {
if (e != nullptr && action_ == DELETE) {
delete e;
}
}
private:
Action action_;
};
using UniqueEventPtr = std::unique_ptr<EventBase, EventDeleter>;
using EventQueue = common::BlockingQueue<UniqueEventPtr>;
// Flows through gRPC's CompletionQueue and then our EventQueue.
struct CompletionQueueRpcEvent : public EventBase {
CompletionQueueRpcEvent(Event event, Rpc* rpc)
: EventBase(event), rpc_ptr(rpc), ok(false), pending(false) {}
void PushToEventQueue() {
rpc_ptr->event_queue()->Push(
UniqueEventPtr(this, EventDeleter(EventDeleter::DO_NOT_DELETE)));
}
void Handle() override;
Rpc* rpc_ptr;
bool ok;
bool pending;
};
// Flows only through our EventQueue.
struct InternalRpcEvent : public EventBase {
InternalRpcEvent(Event event, std::weak_ptr<Rpc> rpc)
: EventBase(event), rpc(rpc) {}
void Handle() override;
std::weak_ptr<Rpc> rpc;
};
Rpc(int method_index, ::grpc::ServerCompletionQueue* server_completion_queue,
EventQueue* event_queue, ExecutionContext* execution_context,
const RpcHandlerInfo& rpc_handler_info, Service* service,
WeakPtrFactory weak_ptr_factory);
std::unique_ptr<Rpc> Clone();
void OnRequest();
void OnReadsDone();
void OnFinish();
void RequestNextMethodInvocation();
void RequestStreamingReadIfNeeded();
void HandleSendQueue();
void Write(std::unique_ptr<::google::protobuf::Message> message);
void Finish(::grpc::Status status);
Service* service() { return service_; }
bool IsRpcEventPending(Event event);
bool IsAnyEventPending();
void SetEventQueue(EventQueue* event_queue) { event_queue_ = event_queue; }
EventQueue* event_queue() { return event_queue_; }
std::weak_ptr<Rpc> GetWeakPtr();
private:
struct SendItem {
std::unique_ptr<google::protobuf::Message> msg;
::grpc::Status status;
};
Rpc(const Rpc&) = delete;
Rpc& operator=(const Rpc&) = delete;
void InitializeReadersAndWriters(
::grpc::internal::RpcMethod::RpcType rpc_type);
CompletionQueueRpcEvent* GetRpcEvent(Event event);
bool* GetRpcEventState(Event event);
void SetRpcEventState(Event event, bool pending);
void EnqueueMessage(SendItem&& send_item);
void PerformFinish(std::unique_ptr<::google::protobuf::Message> message,
::grpc::Status status);
void PerformWrite(std::unique_ptr<::google::protobuf::Message> message,
::grpc::Status status);
::grpc::internal::AsyncReaderInterface<::google::protobuf::Message>*
async_reader_interface();
::grpc::internal::AsyncWriterInterface<::google::protobuf::Message>*
async_writer_interface();
::grpc::internal::ServerAsyncStreamingInterface* streaming_interface();
int method_index_;
::grpc::ServerCompletionQueue* server_completion_queue_;
EventQueue* event_queue_;
ExecutionContext* execution_context_;
RpcHandlerInfo rpc_handler_info_;
Service* service_;
WeakPtrFactory weak_ptr_factory_;
::grpc::ServerContext server_context_;
CompletionQueueRpcEvent new_connection_event_;
CompletionQueueRpcEvent read_event_;
CompletionQueueRpcEvent write_event_;
CompletionQueueRpcEvent finish_event_;
CompletionQueueRpcEvent done_event_;
std::unique_ptr<google::protobuf::Message> request_;
std::unique_ptr<google::protobuf::Message> response_;
std::unique_ptr<RpcHandlerInterface> handler_;
std::unique_ptr<::grpc::ServerAsyncResponseWriter<google::protobuf::Message>>
server_async_response_writer_;
std::unique_ptr<::grpc::ServerAsyncReader<google::protobuf::Message,
google::protobuf::Message>>
server_async_reader_;
std::unique_ptr<::grpc::ServerAsyncReaderWriter<google::protobuf::Message,
google::protobuf::Message>>
server_async_reader_writer_;
std::unique_ptr<::grpc::ServerAsyncWriter<google::protobuf::Message>>
server_async_writer_;
common::Mutex send_queue_lock_;
std::queue<SendItem> send_queue_;
};
using EventQueue = Rpc::EventQueue;
// This class keeps track of all in-flight RPCs for a 'Service'. Make sure that
// all RPCs have been terminated and removed from this object before it goes out
// of scope.
class ActiveRpcs {
public:
ActiveRpcs();
~ActiveRpcs() EXCLUDES(lock_);
std::shared_ptr<Rpc> Add(std::unique_ptr<Rpc> rpc) EXCLUDES(lock_);
bool Remove(Rpc* rpc) EXCLUDES(lock_);
Rpc::WeakPtrFactory GetWeakPtrFactory();
private:
std::weak_ptr<Rpc> GetWeakPtr(Rpc* rpc);
common::Mutex lock_;
std::map<Rpc*, std::shared_ptr<Rpc>> rpcs_;
};
} // namespace framework
} // namespace cloud
} // namespace cartographer
#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RPC_H

View File

@ -1,93 +0,0 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RPC_HANDLER_H
#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RPC_HANDLER_H
#include "cartographer/cloud/internal/framework/execution_context.h"
#include "cartographer/cloud/internal/framework/rpc.h"
#include "cartographer/cloud/internal/framework/rpc_handler_interface.h"
#include "cartographer/cloud/internal/framework/type_traits.h"
#include "glog/logging.h"
#include "google/protobuf/message.h"
#include "grpc++/grpc++.h"
namespace cartographer {
namespace cloud {
namespace framework {
template <typename Incoming, typename Outgoing>
class RpcHandler : public RpcHandlerInterface {
public:
using IncomingType = Incoming;
using OutgoingType = Outgoing;
using RequestType = StripStream<Incoming>;
using ResponseType = StripStream<Outgoing>;
class Writer {
public:
explicit Writer(std::weak_ptr<Rpc> rpc) : rpc_(std::move(rpc)) {}
bool Write(std::unique_ptr<ResponseType> message) const {
if (auto rpc = rpc_.lock()) {
rpc->Write(std::move(message));
return true;
}
return false;
}
bool WritesDone() const {
if (auto rpc = rpc_.lock()) {
rpc->Finish(::grpc::Status::OK);
return true;
}
return false;
}
private:
const std::weak_ptr<Rpc> rpc_;
};
void SetExecutionContext(ExecutionContext* execution_context) override {
execution_context_ = execution_context;
}
void SetRpc(Rpc* rpc) override { rpc_ = rpc; }
void OnRequestInternal(const ::google::protobuf::Message* request) override {
DCHECK(dynamic_cast<const RequestType*>(request));
OnRequest(static_cast<const RequestType&>(*request));
}
virtual void OnRequest(const RequestType& request) = 0;
void Finish(::grpc::Status status) { rpc_->Finish(status); }
void Send(std::unique_ptr<ResponseType> response) {
rpc_->Write(std::move(response));
}
template <typename T>
ExecutionContext::Synchronized<T> GetContext() {
return {execution_context_->lock(), execution_context_};
}
template <typename T>
T* GetUnsynchronizedContext() {
return dynamic_cast<T*>(execution_context_);
}
Writer GetWriter() { return Writer(rpc_->GetWeakPtr()); }
private:
Rpc* rpc_;
ExecutionContext* execution_context_;
};
} // namespace framework
} // namespace cloud
} // namespace cartographer
#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RPC_HANDLER_H

View File

@ -1,66 +0,0 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RPC_HANDLER_INTERFACE_H_
#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RPC_HANDLER_INTERFACE_H_
#include "cartographer/cloud/internal/framework/execution_context.h"
#include "cartographer/common/make_unique.h"
#include "google/protobuf/message.h"
#include "grpc++/grpc++.h"
namespace cartographer {
namespace cloud {
namespace framework {
class Rpc;
class RpcHandlerInterface {
public:
virtual ~RpcHandlerInterface() = default;
// Returns the fully qualified name of the gRPC method this handler is
// implementing. The fully qualified name has the structure
// '/<<full service name>>/<<method name>>', where the service name is the
// fully qualified proto package name of the service and method name the name
// of the method as defined in the service definition of the proto.
virtual std::string method_name() const = 0;
virtual void SetExecutionContext(ExecutionContext* execution_context) = 0;
virtual void SetRpc(Rpc* rpc) = 0;
virtual void OnRequestInternal(
const ::google::protobuf::Message* request) = 0;
virtual void OnReadsDone(){};
virtual void OnFinish(){};
template <class RpcHandlerType>
static std::unique_ptr<RpcHandlerType> Instantiate() {
return common::make_unique<RpcHandlerType>();
}
};
using RpcHandlerFactory = std::function<std::unique_ptr<RpcHandlerInterface>(
Rpc*, ExecutionContext*)>;
struct RpcHandlerInfo {
const google::protobuf::Descriptor* request_descriptor;
const google::protobuf::Descriptor* response_descriptor;
const RpcHandlerFactory rpc_handler_factory;
const ::grpc::internal::RpcMethod::RpcType rpc_type;
const std::string fully_qualified_name;
};
} // namespace framework
} // namespace cloud
} // namespace cartographer
#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RPC_HANDLER_INTERFACE_H_

View File

@ -1,193 +0,0 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cartographer/cloud/internal/framework/server.h"
#include "glog/logging.h"
namespace cartographer {
namespace cloud {
namespace framework {
namespace {
const common::Duration kPopEventTimeout = common::FromMilliseconds(100);
} // namespace
void Server::Builder::SetNumGrpcThreads(const size_t num_grpc_threads) {
options_.num_grpc_threads = num_grpc_threads;
}
void Server::Builder::SetNumEventThreads(const std::size_t num_event_threads) {
options_.num_event_threads = num_event_threads;
}
void Server::Builder::SetServerAddress(const std::string& server_address) {
options_.server_address = server_address;
}
std::tuple<std::string, std::string> Server::Builder::ParseMethodFullName(
const std::string& method_full_name) {
CHECK(method_full_name.at(0) == '/') << "Invalid method name.";
std::stringstream stream(method_full_name.substr(1));
std::string service_full_name;
std::getline(stream, service_full_name, '/');
std::string method_name;
std::getline(stream, method_name, '/');
CHECK(!service_full_name.empty() && !method_name.empty());
return std::make_tuple(service_full_name, method_name);
}
std::unique_ptr<Server> Server::Builder::Build() {
std::unique_ptr<Server> server(new Server(options_));
for (const auto& service_handlers : rpc_handlers_) {
server->AddService(service_handlers.first, service_handlers.second);
}
return server;
}
Server::Server(const Options& options) : options_(options) {
server_builder_.AddListeningPort(options_.server_address,
::grpc::InsecureServerCredentials());
// Set up event queue threads.
event_queue_threads_ =
std::vector<EventQueueThread>(options_.num_event_threads);
// Set up completion queues threads.
for (size_t i = 0; i < options_.num_grpc_threads; ++i) {
completion_queue_threads_.emplace_back(
server_builder_.AddCompletionQueue());
}
}
void Server::AddService(
const std::string& service_name,
const std::map<std::string, RpcHandlerInfo>& rpc_handler_infos) {
// Instantiate and register service.
const auto result = services_.emplace(
std::piecewise_construct, std::make_tuple(service_name),
std::make_tuple(service_name, rpc_handler_infos,
[this]() { return SelectNextEventQueueRoundRobin(); }));
CHECK(result.second) << "A service named " << service_name
<< " already exists.";
server_builder_.RegisterService(&result.first->second);
}
void Server::RunCompletionQueue(
::grpc::ServerCompletionQueue* completion_queue) {
bool ok;
void* tag;
while (completion_queue->Next(&tag, &ok)) {
auto* rpc_event = static_cast<Rpc::CompletionQueueRpcEvent*>(tag);
rpc_event->ok = ok;
rpc_event->PushToEventQueue();
}
}
EventQueue* Server::SelectNextEventQueueRoundRobin() {
common::MutexLocker locker(&current_event_queue_id_lock_);
current_event_queue_id_ =
(current_event_queue_id_ + 1) % options_.num_event_threads;
return event_queue_threads_.at(current_event_queue_id_).event_queue();
}
void Server::RunEventQueue(EventQueue* event_queue) {
while (!shutting_down_) {
Rpc::UniqueEventPtr rpc_event =
event_queue->PopWithTimeout(kPopEventTimeout);
if (rpc_event) {
rpc_event->Handle();
}
}
// Finish processing the rest of the items.
while (Rpc::UniqueEventPtr rpc_event =
event_queue->PopWithTimeout(kPopEventTimeout)) {
rpc_event->Handle();
}
}
void Server::Start() {
// Start the gRPC server process.
server_ = server_builder_.BuildAndStart();
// Start serving all services on all completion queues.
for (auto& service : services_) {
service.second.StartServing(completion_queue_threads_,
execution_context_.get());
}
// Start threads to process all event queues.
for (auto& event_queue_thread : event_queue_threads_) {
event_queue_thread.Start(
[this](EventQueue* event_queue) { RunEventQueue(event_queue); });
}
// Start threads to process all completion queues.
for (auto& completion_queue_threads : completion_queue_threads_) {
completion_queue_threads.Start(
[this](::grpc::ServerCompletionQueue* completion_queue) {
RunCompletionQueue(completion_queue);
});
}
}
void Server::WaitForShutdown() {
if (!server_) {
return;
}
server_->Wait();
}
void Server::Shutdown() {
LOG(INFO) << "Shutting down server.";
shutting_down_ = true;
// Tell the services to stop serving RPCs.
for (auto& service : services_) {
service.second.StopServing();
}
// Shut down the gRPC server waiting for RPCs to finish until the hard
// deadline; then force a shutdown.
server_->Shutdown();
// Shut down the server completion queues and wait for the processing threads
// to join.
for (auto& completion_queue_threads : completion_queue_threads_) {
completion_queue_threads.Shutdown();
}
for (auto& event_queue_thread : event_queue_threads_) {
event_queue_thread.Shutdown();
}
LOG(INFO) << "Shutdown complete.";
}
void Server::SetExecutionContext(
std::unique_ptr<ExecutionContext> execution_context) {
// After the server has been started the 'ExecutionHandle' cannot be changed
// anymore.
CHECK(!server_);
execution_context_ = std::move(execution_context);
}
} // namespace framework
} // namespace cloud
} // namespace cartographer

View File

@ -1,200 +0,0 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_SERVER_H
#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_SERVER_H
#include <cstddef>
#include <memory>
#include <sstream>
#include <string>
#include <thread>
#include "cartographer/cloud/internal/framework/completion_queue_thread.h"
#include "cartographer/cloud/internal/framework/event_queue_thread.h"
#include "cartographer/cloud/internal/framework/execution_context.h"
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "cartographer/cloud/internal/framework/service.h"
#include "cartographer/common/make_unique.h"
#include "grpc++/grpc++.h"
namespace cartographer {
namespace cloud {
namespace framework {
class Server {
protected:
// All options that configure server behaviour such as number of threads,
// ports etc.
struct Options {
size_t num_grpc_threads;
size_t num_event_threads;
std::string server_address;
};
public:
// This 'Builder' is the only way to construct a 'Server'.
class Builder {
public:
Builder() = default;
std::unique_ptr<Server> Build();
void SetNumGrpcThreads(std::size_t num_grpc_threads);
void SetNumEventThreads(std::size_t num_event_threads);
void SetServerAddress(const std::string& server_address);
template <typename RpcHandlerType>
void RegisterHandler() {
std::string method_full_name =
RpcHandlerInterface::Instantiate<RpcHandlerType>()->method_name();
std::string service_full_name;
std::string method_name;
std::tie(service_full_name, method_name) =
ParseMethodFullName(method_full_name);
CheckHandlerCompatibility<RpcHandlerType>(service_full_name, method_name);
rpc_handlers_[service_full_name].emplace(
method_name,
RpcHandlerInfo{
RpcHandlerType::RequestType::default_instance().GetDescriptor(),
RpcHandlerType::ResponseType::default_instance().GetDescriptor(),
[](Rpc* const rpc, ExecutionContext* const execution_context) {
std::unique_ptr<RpcHandlerInterface> rpc_handler =
common::make_unique<RpcHandlerType>();
rpc_handler->SetRpc(rpc);
rpc_handler->SetExecutionContext(execution_context);
return rpc_handler;
},
RpcType<typename RpcHandlerType::IncomingType,
typename RpcHandlerType::OutgoingType>::value,
method_full_name});
}
static std::tuple<std::string /* service_full_name */,
std::string /* method_name */>
ParseMethodFullName(const std::string& method_full_name);
private:
using ServiceInfo = std::map<std::string, RpcHandlerInfo>;
template <typename RpcHandlerType>
void CheckHandlerCompatibility(const std::string& service_full_name,
const std::string& method_name) {
const auto* pool = google::protobuf::DescriptorPool::generated_pool();
const auto* service = pool->FindServiceByName(service_full_name);
CHECK(service) << "Unknown service " << service_full_name;
const auto* method_descriptor = service->FindMethodByName(method_name);
CHECK(method_descriptor) << "Unknown method " << method_name
<< " in service " << service_full_name;
const auto* request_type = method_descriptor->input_type();
CHECK_EQ(RpcHandlerType::RequestType::default_instance().GetDescriptor(),
request_type);
const auto* response_type = method_descriptor->output_type();
CHECK_EQ(RpcHandlerType::ResponseType::default_instance().GetDescriptor(),
response_type);
const auto rpc_type =
RpcType<typename RpcHandlerType::IncomingType,
typename RpcHandlerType::OutgoingType>::value;
switch (rpc_type) {
case ::grpc::internal::RpcMethod::NORMAL_RPC:
CHECK(!method_descriptor->client_streaming());
CHECK(!method_descriptor->server_streaming());
break;
case ::grpc::internal::RpcMethod::CLIENT_STREAMING:
CHECK(method_descriptor->client_streaming());
CHECK(!method_descriptor->server_streaming());
break;
case ::grpc::internal::RpcMethod::SERVER_STREAMING:
CHECK(!method_descriptor->client_streaming());
CHECK(method_descriptor->server_streaming());
break;
case ::grpc::internal::RpcMethod::BIDI_STREAMING:
CHECK(method_descriptor->client_streaming());
CHECK(method_descriptor->server_streaming());
break;
}
}
Options options_;
std::map<std::string, ServiceInfo> rpc_handlers_;
};
friend class Builder;
virtual ~Server() = default;
// Starts a server starts serving the registered services.
void Start();
// Waits for the server to shut down. Note: The server must be either shutting
// down or some other thread must call 'Shutdown()' for this function to ever
// return.
void WaitForShutdown();
// Shuts down the server and all of its services.
void Shutdown();
// Sets the server-wide context object shared between RPC handlers.
void SetExecutionContext(std::unique_ptr<ExecutionContext> execution_context);
template <typename T>
ExecutionContext::Synchronized<T> GetContext() {
return {execution_context_->lock(), execution_context_.get()};
}
template <typename T>
T* GetUnsynchronizedContext() {
return dynamic_cast<T*>(execution_context_.get());
}
protected:
Server(const Options& options);
void AddService(
const std::string& service_name,
const std::map<std::string, RpcHandlerInfo>& rpc_handler_infos);
private:
Server(const Server&) = delete;
Server& operator=(const Server&) = delete;
void RunCompletionQueue(::grpc::ServerCompletionQueue* completion_queue);
void RunEventQueue(Rpc::EventQueue* event_queue);
Rpc::EventQueue* SelectNextEventQueueRoundRobin();
Options options_;
bool shutting_down_ = false;
// gRPC objects needed to build a server.
::grpc::ServerBuilder server_builder_;
std::unique_ptr<::grpc::Server> server_;
// Threads processing the completion queues.
std::vector<CompletionQueueThread> completion_queue_threads_;
// Threads processing RPC events.
std::vector<EventQueueThread> event_queue_threads_;
common::Mutex current_event_queue_id_lock_;
int current_event_queue_id_ = 0;
// Map of service names to services.
std::map<std::string, Service> services_;
// A context object that is shared between all implementations of
// 'RpcHandler'.
std::unique_ptr<ExecutionContext> execution_context_;
};
} // namespace framework
} // namespace cloud
} // namespace cartographer
#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_SERVER_H

View File

@ -1,265 +0,0 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cartographer/cloud/internal/framework/server.h"
#include <future>
#include "cartographer/cloud/internal/framework/client.h"
#include "cartographer/cloud/internal/framework/execution_context.h"
#include "cartographer/cloud/internal/framework/proto/math_service.pb.h"
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "glog/logging.h"
#include "google/protobuf/descriptor.h"
#include "grpc++/grpc++.h"
#include "gtest/gtest.h"
namespace cartographer {
namespace cloud {
namespace framework {
namespace {
using EchoResponder = std::function<bool()>;
class MathServerContext : public ExecutionContext {
public:
int additional_increment() { return 10; }
std::promise<EchoResponder> echo_responder;
};
class GetSumHandler
: public RpcHandler<Stream<proto::GetSumRequest>, proto::GetSumResponse> {
public:
std::string method_name() const override {
return "/cartographer.cloud.framework.proto.Math/GetSum";
}
void OnRequest(const proto::GetSumRequest& request) override {
sum_ += GetContext<MathServerContext>()->additional_increment();
sum_ += request.input();
}
void OnReadsDone() override {
auto response = common::make_unique<proto::GetSumResponse>();
response->set_output(sum_);
Send(std::move(response));
}
private:
int sum_ = 0;
};
class GetRunningSumHandler : public RpcHandler<Stream<proto::GetSumRequest>,
Stream<proto::GetSumResponse>> {
public:
std::string method_name() const override {
return "/cartographer.cloud.framework.proto.Math/GetRunningSum";
}
void OnRequest(const proto::GetSumRequest& request) override {
sum_ += request.input();
// Respond twice to demonstrate bidirectional streaming.
auto response = common::make_unique<proto::GetSumResponse>();
response->set_output(sum_);
Send(std::move(response));
response = common::make_unique<proto::GetSumResponse>();
response->set_output(sum_);
Send(std::move(response));
}
void OnReadsDone() override { Finish(::grpc::Status::OK); }
private:
int sum_ = 0;
};
class GetSquareHandler
: public RpcHandler<proto::GetSquareRequest, proto::GetSquareResponse> {
public:
std::string method_name() const override {
return "/cartographer.cloud.framework.proto.Math/GetSquare";
}
void OnRequest(const proto::GetSquareRequest& request) override {
auto response = common::make_unique<proto::GetSquareResponse>();
response->set_output(request.input() * request.input());
Send(std::move(response));
}
};
class GetEchoHandler
: public RpcHandler<proto::GetEchoRequest, proto::GetEchoResponse> {
public:
std::string method_name() const override {
return "/cartographer.cloud.framework.proto.Math/GetEcho";
}
void OnRequest(const proto::GetEchoRequest& request) override {
int value = request.input();
Writer writer = GetWriter();
GetContext<MathServerContext>()->echo_responder.set_value(
[writer, value]() {
auto response = common::make_unique<proto::GetEchoResponse>();
response->set_output(value);
return writer.Write(std::move(response));
});
}
};
class GetSequenceHandler
: public RpcHandler<proto::GetSequenceRequest,
Stream<proto::GetSequenceResponse>> {
public:
std::string method_name() const override {
return "/cartographer.cloud.framework.proto.Math/GetSequence";
}
void OnRequest(const proto::GetSequenceRequest& request) override {
for (int i = 0; i < request.input(); ++i) {
auto response = common::make_unique<proto::GetSequenceResponse>();
response->set_output(i);
Send(std::move(response));
}
Finish(::grpc::Status::OK);
}
};
// TODO(cschuet): Due to the hard-coded part these tests will become flaky when
// run in parallel. It would be nice to find a way to solve that. gRPC also
// allows to communicate over UNIX domain sockets.
const std::string kServerAddress = "localhost:50051";
const std::size_t kNumThreads = 1;
class ServerTest : public ::testing::Test {
protected:
void SetUp() override {
Server::Builder server_builder;
server_builder.SetServerAddress(kServerAddress);
server_builder.SetNumGrpcThreads(kNumThreads);
server_builder.SetNumEventThreads(kNumThreads);
server_builder.RegisterHandler<GetSumHandler>();
server_builder.RegisterHandler<GetSquareHandler>();
server_builder.RegisterHandler<GetRunningSumHandler>();
server_builder.RegisterHandler<GetEchoHandler>();
server_builder.RegisterHandler<GetSequenceHandler>();
server_ = server_builder.Build();
client_channel_ = ::grpc::CreateChannel(
kServerAddress, ::grpc::InsecureChannelCredentials());
}
std::unique_ptr<Server> server_;
std::shared_ptr<::grpc::Channel> client_channel_;
};
TEST_F(ServerTest, StartAndStopServerTest) {
server_->Start();
server_->Shutdown();
}
TEST_F(ServerTest, ProcessRpcStreamTest) {
server_->SetExecutionContext(common::make_unique<MathServerContext>());
server_->Start();
Client<GetSumHandler> client(client_channel_);
for (int i = 0; i < 3; ++i) {
proto::GetSumRequest request;
request.set_input(i);
EXPECT_TRUE(client.Write(request));
}
EXPECT_TRUE(client.WritesDone());
EXPECT_TRUE(client.Finish().ok());
EXPECT_EQ(client.response().output(), 33);
server_->Shutdown();
}
TEST_F(ServerTest, ProcessUnaryRpcTest) {
server_->Start();
Client<GetSquareHandler> client(client_channel_);
proto::GetSquareRequest request;
request.set_input(11);
EXPECT_TRUE(client.Write(request));
EXPECT_EQ(client.response().output(), 121);
server_->Shutdown();
}
TEST_F(ServerTest, ProcessBidiStreamingRpcTest) {
server_->Start();
Client<GetRunningSumHandler> client(client_channel_);
for (int i = 0; i < 3; ++i) {
proto::GetSumRequest request;
request.set_input(i);
EXPECT_TRUE(client.Write(request));
}
client.WritesDone();
proto::GetSumResponse response;
std::list<int> expected_responses = {0, 0, 1, 1, 3, 3};
while (client.Read(&response)) {
EXPECT_EQ(expected_responses.front(), response.output());
expected_responses.pop_front();
}
EXPECT_TRUE(expected_responses.empty());
EXPECT_TRUE(client.Finish().ok());
server_->Shutdown();
}
TEST_F(ServerTest, WriteFromOtherThread) {
server_->SetExecutionContext(common::make_unique<MathServerContext>());
server_->Start();
Server* server = server_.get();
std::thread response_thread([server]() {
std::future<EchoResponder> responder_future =
server->GetContext<MathServerContext>()->echo_responder.get_future();
responder_future.wait();
auto responder = responder_future.get();
CHECK(responder());
});
Client<GetEchoHandler> client(client_channel_);
proto::GetEchoRequest request;
request.set_input(13);
EXPECT_TRUE(client.Write(request));
response_thread.join();
EXPECT_EQ(client.response().output(), 13);
server_->Shutdown();
}
TEST_F(ServerTest, ProcessServerStreamingRpcTest) {
server_->Start();
Client<GetSequenceHandler> client(client_channel_);
proto::GetSequenceRequest request;
request.set_input(12);
client.Write(request);
proto::GetSequenceResponse response;
for (int i = 0; i < 12; ++i) {
EXPECT_TRUE(client.Read(&response));
EXPECT_EQ(response.output(), i);
}
EXPECT_FALSE(client.Read(&response));
EXPECT_TRUE(client.Finish().ok());
server_->Shutdown();
}
} // namespace
} // namespace framework
} // namespace cloud
} // namespace cartographer

View File

@ -1,151 +0,0 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cartographer/cloud/internal/framework/server.h"
#include <cstdlib>
#include "glog/logging.h"
#include "grpc++/impl/codegen/proto_utils.h"
namespace cartographer {
namespace cloud {
namespace framework {
Service::Service(const std::string& service_name,
const std::map<std::string, RpcHandlerInfo>& rpc_handler_infos,
EventQueueSelector event_queue_selector)
: rpc_handler_infos_(rpc_handler_infos),
event_queue_selector_(event_queue_selector) {
for (const auto& rpc_handler_info : rpc_handler_infos_) {
// The 'handler' below is set to 'nullptr' indicating that we want to
// handle this method asynchronously.
this->AddMethod(new ::grpc::internal::RpcServiceMethod(
rpc_handler_info.second.fully_qualified_name.c_str(),
rpc_handler_info.second.rpc_type, nullptr /* handler */));
}
}
void Service::StartServing(
std::vector<CompletionQueueThread>& completion_queue_threads,
ExecutionContext* execution_context) {
int i = 0;
for (const auto& rpc_handler_info : rpc_handler_infos_) {
for (auto& completion_queue_thread : completion_queue_threads) {
std::shared_ptr<Rpc> rpc = active_rpcs_.Add(common::make_unique<Rpc>(
i, completion_queue_thread.completion_queue(),
event_queue_selector_(), execution_context, rpc_handler_info.second,
this, active_rpcs_.GetWeakPtrFactory()));
rpc->RequestNextMethodInvocation();
}
++i;
}
}
void Service::StopServing() { shutting_down_ = true; }
void Service::HandleEvent(Rpc::Event event, Rpc* rpc, bool ok) {
switch (event) {
case Rpc::Event::NEW_CONNECTION:
HandleNewConnection(rpc, ok);
break;
case Rpc::Event::READ:
HandleRead(rpc, ok);
break;
case Rpc::Event::WRITE_NEEDED:
case Rpc::Event::WRITE:
HandleWrite(rpc, ok);
break;
case Rpc::Event::FINISH:
HandleFinish(rpc, ok);
break;
case Rpc::Event::DONE:
HandleDone(rpc, ok);
break;
}
}
void Service::HandleNewConnection(Rpc* rpc, bool ok) {
if (shutting_down_) {
if (ok) {
LOG(WARNING) << "Server shutting down. Refusing to handle new RPCs.";
}
active_rpcs_.Remove(rpc);
return;
}
if (!ok) {
LOG(ERROR) << "Failed to establish connection for unknown reason.";
active_rpcs_.Remove(rpc);
}
if (ok) {
// For request-streaming RPCs ask the client to start sending requests.
rpc->RequestStreamingReadIfNeeded();
}
// Create new active rpc to handle next connection and register it for the
// incoming connection. Assign event queue in a round-robin fashion.
std::unique_ptr<Rpc> new_rpc = rpc->Clone();
new_rpc->SetEventQueue(event_queue_selector_());
active_rpcs_.Add(std::move(new_rpc))->RequestNextMethodInvocation();
}
void Service::HandleRead(Rpc* rpc, bool ok) {
if (ok) {
rpc->OnRequest();
rpc->RequestStreamingReadIfNeeded();
return;
}
// Reads completed.
rpc->OnReadsDone();
RemoveIfNotPending(rpc);
}
void Service::HandleWrite(Rpc* rpc, bool ok) {
if (!ok) {
LOG(ERROR) << "Write failed";
}
// Send the next message or potentially finish the connection.
rpc->HandleSendQueue();
RemoveIfNotPending(rpc);
}
void Service::HandleFinish(Rpc* rpc, bool ok) {
if (!ok) {
LOG(ERROR) << "Finish failed";
}
rpc->OnFinish();
RemoveIfNotPending(rpc);
}
void Service::HandleDone(Rpc* rpc, bool ok) { RemoveIfNotPending(rpc); }
void Service::RemoveIfNotPending(Rpc* rpc) {
if (!rpc->IsAnyEventPending()) {
active_rpcs_.Remove(rpc);
}
}
} // namespace framework
} // namespace cloud
} // namespace cartographer

View File

@ -1,67 +0,0 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_SERVICE_H
#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_SERVICE_H
#include "cartographer/cloud/internal/framework/completion_queue_thread.h"
#include "cartographer/cloud/internal/framework/event_queue_thread.h"
#include "cartographer/cloud/internal/framework/execution_context.h"
#include "cartographer/cloud/internal/framework/rpc.h"
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "grpc++/impl/codegen/service_type.h"
namespace cartographer {
namespace cloud {
namespace framework {
// A 'Service' represents a generic service for gRPC asynchronous methods and is
// responsible for managing the lifetime of active RPCs issued against methods
// of the service and distributing incoming gRPC events to their respective
// 'Rpc' handler objects.
class Service : public ::grpc::Service {
public:
using EventQueueSelector = std::function<EventQueue*()>;
friend class Rpc;
Service(const std::string& service_name,
const std::map<std::string, RpcHandlerInfo>& rpc_handlers,
EventQueueSelector event_queue_selector);
void StartServing(std::vector<CompletionQueueThread>& completion_queues,
ExecutionContext* execution_context);
void HandleEvent(Rpc::Event event, Rpc* rpc, bool ok);
void StopServing();
private:
void HandleNewConnection(Rpc* rpc, bool ok);
void HandleRead(Rpc* rpc, bool ok);
void HandleWrite(Rpc* rpc, bool ok);
void HandleFinish(Rpc* rpc, bool ok);
void HandleDone(Rpc* rpc, bool ok);
void RemoveIfNotPending(Rpc* rpc);
std::map<std::string, RpcHandlerInfo> rpc_handler_infos_;
EventQueueSelector event_queue_selector_;
ActiveRpcs active_rpcs_;
bool shutting_down_ = false;
};
} // namespace framework
} // namespace cloud
} // namespace cartographer
#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_SERVICE_H

View File

@ -1,141 +0,0 @@
/*
* Copyright 2018 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_TESTING_RPC_HANDLER_TEST_SERVER_H_
#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_TESTING_RPC_HANDLER_TEST_SERVER_H_
#include <functional>
#include <string>
#include "cartographer/cloud/internal/framework/client.h"
#include "cartographer/cloud/internal/framework/rpc_handler_interface.h"
#include "cartographer/cloud/internal/framework/server.h"
#include "cartographer/cloud/internal/framework/testing/rpc_handler_wrapper.h"
#include "cartographer/common/blocking_queue.h"
#include "grpc++/grpc++.h"
#include "gtest/gtest.h"
namespace cartographer {
namespace cloud {
namespace framework {
namespace testing {
namespace {
const std::string kServerAddress = "localhost:50051";
} // namespace
template <typename RpcHandlerType>
class RpcHandlerTestServer : public Server {
public:
RpcHandlerTestServer(std::unique_ptr<ExecutionContext> execution_context)
: Server(Options{1, 1, kServerAddress}),
channel_(::grpc::CreateChannel(kServerAddress,
::grpc::InsecureChannelCredentials())),
client_(channel_) {
std::string method_full_name_under_test =
RpcHandlerInterface::Instantiate<RpcHandlerType>()->method_name();
std::string service_full_name;
std::string method_name;
std::tie(service_full_name, method_name) =
Server::Builder::ParseMethodFullName(method_full_name_under_test);
this->AddService(
service_full_name,
{{method_name, GetRpcHandlerInfo(method_full_name_under_test)}});
this->SetExecutionContext(std::move(execution_context));
this->Start();
}
~RpcHandlerTestServer() { this->Shutdown(); };
void SendWrite(const typename RpcHandlerType::RequestType &message) {
EXPECT_TRUE(client_.Write(message));
WaitForHandlerCompletion(RpcHandlerWrapper<RpcHandlerType>::ON_REQUEST);
}
// Parses a request message from the passed string and issues the
// request against the handler, waits for the handler to complete
// processing before returning.
void SendWrite(const std::string &serialized_message) {
typename RpcHandlerType::RequestType message;
message.ParseFromString(serialized_message);
Write(message);
}
// Sends a WRITES_DONE event to the handler, waits for the handler
// to finish processing the READS_DONE event before returning.
void SendWritesDone() {
EXPECT_TRUE(client_.WritesDone());
WaitForHandlerCompletion(RpcHandlerWrapper<RpcHandlerType>::ON_READS_DONE);
}
// Sends a FINISH event to the handler under test, waits for the
// handler to finish processing the event before returning.
void SendFinish() {
EXPECT_TRUE(client_.Finish().ok());
WaitForHandlerCompletion(RpcHandlerWrapper<RpcHandlerType>::ON_FINISH);
}
const typename RpcHandlerType::ResponseType &response() {
return client_.response();
}
private:
using ClientWriter = ::grpc::internal::ClientWriterFactory<
typename RpcHandlerType::RequestType>;
void WaitForHandlerCompletion(
typename RpcHandlerWrapper<RpcHandlerType>::RpcHandlerEvent event) {
CHECK_EQ(rpc_handler_event_queue_.Pop(), event);
}
RpcHandlerInfo GetRpcHandlerInfo(const std::string &method_full_name) {
::grpc::internal::RpcMethod::RpcType rpc_type =
RpcType<typename RpcHandlerType::IncomingType,
typename RpcHandlerType::OutgoingType>::value;
auto event_callback =
[this](
typename RpcHandlerWrapper<RpcHandlerType>::RpcHandlerEvent event) {
rpc_handler_event_queue_.Push(event);
};
auto handler_instantiator = [event_callback](
Rpc *const rpc,
ExecutionContext *const execution_context) {
std::unique_ptr<RpcHandlerInterface> rpc_handler =
common::make_unique<RpcHandlerWrapper<RpcHandlerType>>(
event_callback);
rpc_handler->SetRpc(rpc);
rpc_handler->SetExecutionContext(execution_context);
return rpc_handler;
};
return RpcHandlerInfo{
RpcHandlerType::RequestType::default_instance().GetDescriptor(),
RpcHandlerType::ResponseType::default_instance().GetDescriptor(),
handler_instantiator, rpc_type, method_full_name};
}
std::shared_ptr<::grpc::Channel> channel_;
cloud::framework::Client<RpcHandlerType> client_;
common::BlockingQueue<
typename RpcHandlerWrapper<RpcHandlerType>::RpcHandlerEvent>
rpc_handler_event_queue_;
};
} // namespace testing
} // namespace framework
} // namespace cloud
} // namespace cartographer
#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_TESTING_RPC_HANDLER_TEST_SERVER_H_

View File

@ -1,60 +0,0 @@
/*
* Copyright 2018 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_TESTING_RPC_HANDLER_WRAPPER_H_
#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_TESTING_RPC_HANDLER_WRAPPER_H_
#include <functional>
namespace cartographer {
namespace cloud {
namespace framework {
namespace testing {
template <class RpcHandlerType>
class RpcHandlerWrapper : public RpcHandlerType {
public:
enum RpcHandlerEvent { ON_REQUEST, ON_READS_DONE, ON_FINISH };
using EventCallback = std::function<void(RpcHandlerEvent)>;
RpcHandlerWrapper(EventCallback event_callback)
: event_callback_(event_callback) {}
void OnRequest(const typename RpcHandlerType::RequestType &request) override {
RpcHandlerType::OnRequest(request);
event_callback_(ON_REQUEST);
}
void OnReadsDone() override {
RpcHandlerType::OnReadsDone();
event_callback_(ON_READS_DONE);
}
void OnFinish() override {
RpcHandlerType::OnFinish();
event_callback_(ON_FINISH);
}
private:
EventCallback event_callback_;
};
} // namespace testing
} // namespace framework
} // namespace cloud
} // namespace cartographer
#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_TESTING_RPC_HANDLER_WRAPPER_H_

View File

@ -1,71 +0,0 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_TYPE_TRAITS_H_
#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_TYPE_TRAITS_H_
#include <grpc++/grpc++.h>
namespace cartographer {
namespace cloud {
namespace framework {
template <typename Request>
class Stream {
using type = Request;
};
template <template <typename> class, typename T>
struct Strip {
using type = T;
};
template <template <typename> class T, typename Param>
struct Strip<T, T<Param>> {
using type = Param;
};
template <typename T>
using StripStream = typename Strip<Stream, T>::type;
template <typename Incoming, typename Outgoing>
struct RpcType
: public std::integral_constant<::grpc::internal::RpcMethod::RpcType,
::grpc::internal::RpcMethod::NORMAL_RPC> {};
template <typename Incoming, typename Outgoing>
struct RpcType<Stream<Incoming>, Outgoing>
: public std::integral_constant<
::grpc::internal::RpcMethod::RpcType,
::grpc::internal::RpcMethod::CLIENT_STREAMING> {};
template <typename Incoming, typename Outgoing>
struct RpcType<Incoming, Stream<Outgoing>>
: public std::integral_constant<
::grpc::internal::RpcMethod::RpcType,
::grpc::internal::RpcMethod::SERVER_STREAMING> {};
template <typename Incoming, typename Outgoing>
struct RpcType<Stream<Incoming>, Stream<Outgoing>>
: public std::integral_constant<
::grpc::internal::RpcMethod::RpcType,
::grpc::internal::RpcMethod::BIDI_STREAMING> {};
} // namespace framework
} // namespace cloud
} // namespace cartographer
#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_TYPE_TRAITS_H_

View File

@ -1,45 +0,0 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cartographer/cloud/internal/framework/type_traits.h"
#include "gtest/gtest.h"
namespace cartographer {
namespace cloud {
namespace framework {
namespace {
TEST(TypeTraitsTest, StreamStripping) {
::testing::StaticAssertTypeEq<StripStream<Stream<int>>, int>();
::testing::StaticAssertTypeEq<StripStream<int>, int>();
}
TEST(TypeTraitsTest, RpcTypes) {
EXPECT_EQ((RpcType<int, int>::value),
::grpc::internal::RpcMethod::NORMAL_RPC);
EXPECT_EQ((RpcType<Stream<int>, int>::value),
::grpc::internal::RpcMethod::CLIENT_STREAMING);
EXPECT_EQ((RpcType<int, Stream<int>>::value),
::grpc::internal::RpcMethod::SERVER_STREAMING);
EXPECT_EQ((RpcType<Stream<int>, Stream<int>>::value),
::grpc::internal::RpcMethod::BIDI_STREAMING);
}
} // namespace
} // namespace framework
} // namespace cloud
} // namespace cartographer

View File

@ -16,7 +16,7 @@
#include "cartographer/cloud/internal/handlers/add_fixed_frame_pose_data_handler.h"
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/internal/map_builder_context_interface.h"
#include "cartographer/cloud/internal/sensor/serialization.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"

View File

@ -17,7 +17,7 @@
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_ADD_FIXED_FRAME_POSE_DATA_HANDLER_H
#define CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_ADD_FIXED_FRAME_POSE_DATA_HANDLER_H
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "google/protobuf/empty.pb.h"
@ -25,14 +25,15 @@ namespace cartographer {
namespace cloud {
namespace handlers {
DEFINE_HANDLER_SIGNATURE(
AddFixedFramePoseDataSignature,
async_grpc::Stream<proto::AddFixedFramePoseDataRequest>,
google::protobuf::Empty,
"/cartographer.cloud.proto.MapBuilderService/AddFixedFramePoseData")
class AddFixedFramePoseDataHandler
: public framework::RpcHandler<
framework::Stream<proto::AddFixedFramePoseDataRequest>,
google::protobuf::Empty> {
: public async_grpc::RpcHandler<AddFixedFramePoseDataSignature> {
public:
std::string method_name() const override {
return "/cartographer.cloud.proto.MapBuilderService/AddFixedFramePoseData";
}
void OnRequest(const proto::AddFixedFramePoseDataRequest &request) override;
void OnReadsDone() override;
};

View File

@ -48,7 +48,8 @@ const std::string kMessage = R"(
})";
using AddFixedFramePoseDataHandlerTest =
testing::HandlerTest<AddFixedFramePoseDataHandler>;
testing::HandlerTest<AddFixedFramePoseDataSignature,
AddFixedFramePoseDataHandler>;
TEST_F(AddFixedFramePoseDataHandlerTest, NoLocalSlamUploader) {
proto::AddFixedFramePoseDataRequest request;

View File

@ -16,7 +16,7 @@
#include "cartographer/cloud/internal/handlers/add_imu_data_handler.h"
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/internal/map_builder_context_interface.h"
#include "cartographer/cloud/internal/sensor/serialization.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"

View File

@ -17,7 +17,7 @@
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_ADD_IMU_DATA_HANDLER_H
#define CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_ADD_IMU_DATA_HANDLER_H
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "google/protobuf/empty.pb.h"
@ -25,13 +25,13 @@ namespace cartographer {
namespace cloud {
namespace handlers {
class AddImuDataHandler
: public framework::RpcHandler<framework::Stream<proto::AddImuDataRequest>,
google::protobuf::Empty> {
DEFINE_HANDLER_SIGNATURE(
AddImuDataSignature, async_grpc::Stream<proto::AddImuDataRequest>,
google::protobuf::Empty,
"/cartographer.cloud.proto.MapBuilderService/AddImuData")
class AddImuDataHandler : public async_grpc::RpcHandler<AddImuDataSignature> {
public:
std::string method_name() const override {
return "/cartographer.cloud.proto.MapBuilderService/AddImuData";
}
void OnRequest(const proto::AddImuDataRequest &request) override;
void OnReadsDone() override;
};

View File

@ -45,7 +45,8 @@ const std::string kMessage = R"(
}
})";
using AddImuDataHandlerTest = testing::HandlerTest<AddImuDataHandler>;
using AddImuDataHandlerTest =
testing::HandlerTest<AddImuDataSignature, AddImuDataHandler>;
TEST_F(AddImuDataHandlerTest, NoLocalSlamUploader) {
proto::AddImuDataRequest request;

View File

@ -16,7 +16,7 @@
#include "cartographer/cloud/internal/handlers/add_landmark_data_handler.h"
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/internal/map_builder_context_interface.h"
#include "cartographer/cloud/internal/sensor/serialization.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"

View File

@ -17,7 +17,7 @@
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_ADD_LANDMARK_DATA_HANDLER_H
#define CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_ADD_LANDMARK_DATA_HANDLER_H
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "google/protobuf/empty.pb.h"
@ -25,14 +25,14 @@ namespace cartographer {
namespace cloud {
namespace handlers {
DEFINE_HANDLER_SIGNATURE(
AddLandmarkDataSignature, async_grpc::Stream<proto::AddLandmarkDataRequest>,
google::protobuf::Empty,
"/cartographer.cloud.proto.MapBuilderService/AddLandmarkData")
class AddLandmarkDataHandler
: public framework::RpcHandler<
framework::Stream<proto::AddLandmarkDataRequest>,
google::protobuf::Empty> {
: public async_grpc::RpcHandler<AddLandmarkDataSignature> {
public:
std::string method_name() const override {
return "/cartographer.cloud.proto.MapBuilderService/AddLandmarkData";
}
void OnRequest(const proto::AddLandmarkDataRequest &request) override;
void OnReadsDone() override;
};

View File

@ -52,7 +52,8 @@ const std::string kMessage = R"(
}
})";
using AddLandmarkDataHandlerTest = testing::HandlerTest<AddLandmarkDataHandler>;
using AddLandmarkDataHandlerTest =
testing::HandlerTest<AddLandmarkDataSignature, AddLandmarkDataHandler>;
TEST_F(AddLandmarkDataHandlerTest, NoLocalSlamUploader) {
proto::AddLandmarkDataRequest request;

View File

@ -16,7 +16,7 @@
#include "cartographer/cloud/internal/handlers/add_local_slam_result_data_handler.h"
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/internal/map_builder_context_interface.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "cartographer/common/make_unique.h"

View File

@ -17,7 +17,7 @@
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_ADD_LOCAL_SLAM_RESULT_DATA_HANDLER_H
#define CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_ADD_LOCAL_SLAM_RESULT_DATA_HANDLER_H
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "google/protobuf/empty.pb.h"
@ -25,14 +25,15 @@ namespace cartographer {
namespace cloud {
namespace handlers {
DEFINE_HANDLER_SIGNATURE(
AddLocalSlamResultDataSignature,
async_grpc::Stream<proto::AddLocalSlamResultDataRequest>,
google::protobuf::Empty,
"/cartographer.cloud.proto.MapBuilderService/AddLocalSlamResultData")
class AddLocalSlamResultDataHandler
: public framework::RpcHandler<
framework::Stream<proto::AddLocalSlamResultDataRequest>,
google::protobuf::Empty> {
: public async_grpc::RpcHandler<AddLocalSlamResultDataSignature> {
public:
std::string method_name() const override {
return "/cartographer.cloud.proto.MapBuilderService/AddLocalSlamResultData";
}
void OnRequest(const proto::AddLocalSlamResultDataRequest& request) override;
void OnReadsDone() override;
};

View File

@ -16,7 +16,7 @@
#include "cartographer/cloud/internal/handlers/add_odometry_data_handler.h"
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/internal/map_builder_context_interface.h"
#include "cartographer/cloud/internal/sensor/serialization.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"

View File

@ -17,7 +17,7 @@
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_ADD_ODOMETRY_DATA_HANDLER_H
#define CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_ADD_ODOMETRY_DATA_HANDLER_H
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "google/protobuf/empty.pb.h"
@ -25,14 +25,14 @@ namespace cartographer {
namespace cloud {
namespace handlers {
DEFINE_HANDLER_SIGNATURE(
AddOdometryDataSignature, async_grpc::Stream<proto::AddOdometryDataRequest>,
google::protobuf::Empty,
"/cartographer.cloud.proto.MapBuilderService/AddOdometryData")
class AddOdometryDataHandler
: public framework::RpcHandler<
framework::Stream<proto::AddOdometryDataRequest>,
google::protobuf::Empty> {
: public async_grpc::RpcHandler<AddOdometryDataSignature> {
public:
std::string method_name() const override {
return "/cartographer.cloud.proto.MapBuilderService/AddOdometryData";
}
void OnRequest(const proto::AddOdometryDataRequest &request) override;
void OnReadsDone() override;
};

View File

@ -47,7 +47,8 @@ const std::string kMessage = R"(
}
})";
using AddOdometryDataHandlerTest = testing::HandlerTest<AddOdometryDataHandler>;
using AddOdometryDataHandlerTest =
testing::HandlerTest<AddOdometryDataSignature, AddOdometryDataHandler>;
TEST_F(AddOdometryDataHandlerTest, NoLocalSlamUploader) {
proto::AddOdometryDataRequest request;

View File

@ -16,7 +16,7 @@
#include "cartographer/cloud/internal/handlers/add_rangefinder_data_handler.h"
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/internal/map_builder_context_interface.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "cartographer/common/make_unique.h"

View File

@ -17,7 +17,7 @@
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_ADD_RANGEFINDER_DATA_HANDLER_H
#define CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_ADD_RANGEFINDER_DATA_HANDLER_H
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "google/protobuf/empty.pb.h"
@ -25,14 +25,15 @@ namespace cartographer {
namespace cloud {
namespace handlers {
DEFINE_HANDLER_SIGNATURE(
AddRangefinderDataSignature,
async_grpc::Stream<proto::AddRangefinderDataRequest>,
google::protobuf::Empty,
"/cartographer.cloud.proto.MapBuilderService/AddRangefinderData")
class AddRangefinderDataHandler
: public framework::RpcHandler<
framework::Stream<proto::AddRangefinderDataRequest>,
google::protobuf::Empty> {
: public async_grpc::RpcHandler<AddRangefinderDataSignature> {
public:
std::string method_name() const override {
return "/cartographer.cloud.proto.MapBuilderService/AddRangefinderData";
}
void OnRequest(const proto::AddRangefinderDataRequest &request) override;
void OnReadsDone() override;
};

View File

@ -46,7 +46,8 @@ const std::string kMessage = R"(
})";
using AddRangefinderDataHandlerTest =
testing::HandlerTest<AddRangefinderDataHandler>;
testing::HandlerTest<AddRangefinderDataSignature,
AddRangefinderDataHandler>;
TEST_F(AddRangefinderDataHandlerTest, NoLocalSlamUploader) {
proto::AddRangefinderDataRequest request;

View File

@ -16,7 +16,7 @@
#include "cartographer/cloud/internal/handlers/add_trajectory_handler.h"
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/internal/map_builder_context_interface.h"
#include "cartographer/cloud/internal/sensor/serialization.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"

View File

@ -17,20 +17,21 @@
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_ADD_TRAJECTORY_HANDLER_H
#define CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_ADD_TRAJECTORY_HANDLER_H
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
namespace cartographer {
namespace cloud {
namespace handlers {
DEFINE_HANDLER_SIGNATURE(
AddTrajectorySignature, proto::AddTrajectoryRequest,
proto::AddTrajectoryResponse,
"/cartographer.cloud.proto.MapBuilderService/AddTrajectory")
class AddTrajectoryHandler
: public framework::RpcHandler<proto::AddTrajectoryRequest,
proto::AddTrajectoryResponse> {
: public async_grpc::RpcHandler<AddTrajectorySignature> {
public:
std::string method_name() const override {
return "/cartographer.cloud.proto.MapBuilderService/AddTrajectory";
}
void OnRequest(const proto::AddTrajectoryRequest& request) override;
};

View File

@ -66,10 +66,11 @@ const std::string kMessage = R"(
)";
class AddTrajectoryHandlerTest
: public testing::HandlerTest<AddTrajectoryHandler> {
: public testing::HandlerTest<AddTrajectorySignature,
AddTrajectoryHandler> {
public:
void SetUp() override {
testing::HandlerTest<AddTrajectoryHandler>::SetUp();
testing::HandlerTest<AddTrajectorySignature, AddTrajectoryHandler>::SetUp();
mock_map_builder_ = common::make_unique<mapping::testing::MockMapBuilder>();
EXPECT_CALL(*mock_map_builder_context_,
GetLocalSlamResultCallbackForSubscriptions())

View File

@ -16,7 +16,7 @@
#include "cartographer/cloud/internal/handlers/finish_trajectory_handler.h"
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/internal/map_builder_context_interface.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "cartographer/common/make_unique.h"

View File

@ -17,7 +17,7 @@
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_FINISH_TRAJECTORY_HANDLER_H
#define CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_FINISH_TRAJECTORY_HANDLER_H
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "google/protobuf/empty.pb.h"
@ -25,13 +25,14 @@ namespace cartographer {
namespace cloud {
namespace handlers {
DEFINE_HANDLER_SIGNATURE(
FinishTrajectorySignature, proto::FinishTrajectoryRequest,
google::protobuf::Empty,
"/cartographer.cloud.proto.MapBuilderService/FinishTrajectory")
class FinishTrajectoryHandler
: public framework::RpcHandler<proto::FinishTrajectoryRequest,
google::protobuf::Empty> {
: public async_grpc::RpcHandler<FinishTrajectorySignature> {
public:
std::string method_name() const override {
return "/cartographer.cloud.proto.MapBuilderService/FinishTrajectory";
}
void OnRequest(const proto::FinishTrajectoryRequest& request) override;
};

View File

@ -16,7 +16,7 @@
#include "cartographer/cloud/internal/handlers/get_all_submap_poses.h"
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/internal/map_builder_context_interface.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "cartographer/common/make_unique.h"

View File

@ -17,7 +17,7 @@
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_GET_ALL_SUBMAP_POSES_H
#define CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_GET_ALL_SUBMAP_POSES_H
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "google/protobuf/empty.pb.h"
@ -25,13 +25,14 @@ namespace cartographer {
namespace cloud {
namespace handlers {
DEFINE_HANDLER_SIGNATURE(
GetAllSubmapPosesSignature, google::protobuf::Empty,
proto::GetAllSubmapPosesResponse,
"/cartographer.cloud.proto.MapBuilderService/GetAllSubmapPoses")
class GetAllSubmapPosesHandler
: public framework::RpcHandler<google::protobuf::Empty,
proto::GetAllSubmapPosesResponse> {
: public async_grpc::RpcHandler<GetAllSubmapPosesSignature> {
public:
std::string method_name() const override {
return "/cartographer.cloud.proto.MapBuilderService/GetAllSubmapPoses";
}
void OnRequest(const google::protobuf::Empty& request) override;
};

View File

@ -16,7 +16,7 @@
#include "cartographer/cloud/internal/handlers/get_constraints_handler.h"
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/internal/map_builder_context_interface.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "cartographer/common/make_unique.h"

View File

@ -17,7 +17,7 @@
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_GET_CONSTRAINTS_HANDLER_H
#define CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_GET_CONSTRAINTS_HANDLER_H
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "google/protobuf/empty.pb.h"
@ -25,13 +25,14 @@ namespace cartographer {
namespace cloud {
namespace handlers {
DEFINE_HANDLER_SIGNATURE(
GetConstraintsSignature, google::protobuf::Empty,
proto::GetConstraintsResponse,
"/cartographer.cloud.proto.MapBuilderService/GetConstraints")
class GetConstraintsHandler
: public framework::RpcHandler<google::protobuf::Empty,
proto::GetConstraintsResponse> {
: public async_grpc::RpcHandler<GetConstraintsSignature> {
public:
std::string method_name() const override {
return "/cartographer.cloud.proto.MapBuilderService/GetConstraints";
}
void OnRequest(const google::protobuf::Empty& request) override;
};

View File

@ -16,7 +16,7 @@
#include "cartographer/cloud/internal/handlers/get_landmark_poses_handler.h"
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/internal/map_builder_context_interface.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "cartographer/common/make_unique.h"

View File

@ -17,7 +17,7 @@
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_GET_LANDMARK_POSES_HANDLER_H
#define CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_GET_LANDMARK_POSES_HANDLER_H
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "google/protobuf/empty.pb.h"
@ -25,13 +25,14 @@ namespace cartographer {
namespace cloud {
namespace handlers {
DEFINE_HANDLER_SIGNATURE(
GetLandmarkPosesSignature, google::protobuf::Empty,
proto::GetLandmarkPosesResponse,
"/cartographer.cloud.proto.MapBuilderService/GetLandmarkPoses")
class GetLandmarkPosesHandler
: public framework::RpcHandler<google::protobuf::Empty,
proto::GetLandmarkPosesResponse> {
: public async_grpc::RpcHandler<GetLandmarkPosesSignature> {
public:
std::string method_name() const override {
return "/cartographer.cloud.proto.MapBuilderService/GetLandmarkPoses";
}
void OnRequest(const google::protobuf::Empty& request) override;
};

View File

@ -57,7 +57,7 @@ const std::string kMessage = R"(
)";
using GetLandmarkPosesHandlerTest =
testing::HandlerTest<GetLandmarkPosesHandler>;
testing::HandlerTest<GetLandmarkPosesSignature, GetLandmarkPosesHandler>;
TEST_F(GetLandmarkPosesHandlerTest, NoLocalSlamUploader) {
std::map<std::string, Rigid3d> landmark_poses{

View File

@ -16,7 +16,7 @@
#include "cartographer/cloud/internal/handlers/get_local_to_global_transform_handler.h"
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/internal/map_builder_context_interface.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "cartographer/transform/transform.h"

View File

@ -17,7 +17,7 @@
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_GET_LOCAL_TO_GLOBAL_TRANSFORM_HANDLER_H
#define CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_GET_LOCAL_TO_GLOBAL_TRANSFORM_HANDLER_H
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "google/protobuf/empty.pb.h"
@ -25,14 +25,14 @@ namespace cartographer {
namespace cloud {
namespace handlers {
DEFINE_HANDLER_SIGNATURE(
GetLocalToGlobalTransformSignature, proto::GetLocalToGlobalTransformRequest,
proto::GetLocalToGlobalTransformResponse,
"/cartographer.cloud.proto.MapBuilderService/GetLocalToGlobalTransform")
class GetLocalToGlobalTransformHandler
: public framework::RpcHandler<proto::GetLocalToGlobalTransformRequest,
proto::GetLocalToGlobalTransformResponse> {
: public async_grpc::RpcHandler<GetLocalToGlobalTransformSignature> {
public:
std::string method_name() const override {
return "/cartographer.cloud.proto.MapBuilderService/"
"GetLocalToGlobalTransform";
}
void OnRequest(
const proto::GetLocalToGlobalTransformRequest& request) override;
};

View File

@ -16,7 +16,7 @@
#include "cartographer/cloud/internal/handlers/get_submap_handler.h"
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/internal/map_builder_context_interface.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "cartographer/common/make_unique.h"

View File

@ -17,7 +17,7 @@
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_GET_SUBMAP_HANDLER_H
#define CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_GET_SUBMAP_HANDLER_H
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "google/protobuf/empty.pb.h"
@ -25,13 +25,12 @@ namespace cartographer {
namespace cloud {
namespace handlers {
class GetSubmapHandler
: public framework::RpcHandler<proto::GetSubmapRequest,
proto::GetSubmapResponse> {
DEFINE_HANDLER_SIGNATURE(
GetSubmapSignature, proto::GetSubmapRequest, proto::GetSubmapResponse,
"/cartographer.cloud.proto.MapBuilderService/GetSubmap")
class GetSubmapHandler : public async_grpc::RpcHandler<GetSubmapSignature> {
public:
std::string method_name() const override {
return "/cartographer.cloud.proto.MapBuilderService/GetSubmap";
}
void OnRequest(const proto::GetSubmapRequest &request) override;
};

View File

@ -16,7 +16,7 @@
#include "cartographer/cloud/internal/handlers/get_trajectory_node_poses_handler.h"
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/internal/map_builder_context_interface.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "cartographer/common/make_unique.h"

View File

@ -17,7 +17,7 @@
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_GET_TRAJECTORY_NODE_POSES_HANDLER_H
#define CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_GET_TRAJECTORY_NODE_POSES_HANDLER_H
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "google/protobuf/empty.pb.h"
@ -25,13 +25,14 @@ namespace cartographer {
namespace cloud {
namespace handlers {
DEFINE_HANDLER_SIGNATURE(
GetTrajectoryNodePosesSignature, google::protobuf::Empty,
proto::GetTrajectoryNodePosesResponse,
"/cartographer.cloud.proto.MapBuilderService/GetTrajectoryNodePoses")
class GetTrajectoryNodePosesHandler
: public framework::RpcHandler<google::protobuf::Empty,
proto::GetTrajectoryNodePosesResponse> {
: public async_grpc::RpcHandler<GetTrajectoryNodePosesSignature> {
public:
std::string method_name() const override {
return "/cartographer.cloud.proto.MapBuilderService/GetTrajectoryNodePoses";
}
void OnRequest(const google::protobuf::Empty& request) override;
};

View File

@ -16,7 +16,7 @@
#include "cartographer/cloud/internal/handlers/load_state_handler.h"
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/internal/map_builder_context_interface.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "cartographer/common/make_unique.h"

View File

@ -17,7 +17,7 @@
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_LOAD_STATE_HANDLER_H
#define CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_LOAD_STATE_HANDLER_H
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "cartographer/io/internal/in_memory_proto_stream.h"
#include "google/protobuf/empty.pb.h"
@ -26,13 +26,13 @@ namespace cartographer {
namespace cloud {
namespace handlers {
class LoadStateHandler
: public framework::RpcHandler<framework::Stream<proto::LoadStateRequest>,
google::protobuf::Empty> {
DEFINE_HANDLER_SIGNATURE(
LoadStateSignature, async_grpc::Stream<proto::LoadStateRequest>,
google::protobuf::Empty,
"/cartographer.cloud.proto.MapBuilderService/LoadState")
class LoadStateHandler : public async_grpc::RpcHandler<LoadStateSignature> {
public:
std::string method_name() const override {
return "/cartographer.cloud.proto.MapBuilderService/LoadState";
}
void OnRequest(const proto::LoadStateRequest& request) override;
void OnReadsDone() override;

View File

@ -16,7 +16,7 @@
#include "cartographer/cloud/internal/handlers/receive_local_slam_results_handler.h"
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/internal/map_builder_context_interface.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "cartographer/common/make_unique.h"

View File

@ -19,7 +19,7 @@
#include <memory>
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/internal/map_builder_context_interface.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
@ -27,15 +27,14 @@ namespace cartographer {
namespace cloud {
namespace handlers {
DEFINE_HANDLER_SIGNATURE(
ReceiveLocalSlamResultsSignature, proto::ReceiveLocalSlamResultsRequest,
async_grpc::Stream<proto::ReceiveLocalSlamResultsResponse>,
"/cartographer.cloud.proto.MapBuilderService/ReceiveLocalSlamResults")
class ReceiveLocalSlamResultsHandler
: public framework::RpcHandler<
proto::ReceiveLocalSlamResultsRequest,
framework::Stream<proto::ReceiveLocalSlamResultsResponse>> {
: public async_grpc::RpcHandler<ReceiveLocalSlamResultsSignature> {
public:
std::string method_name() const override {
return "/cartographer.cloud.proto.MapBuilderService/"
"ReceiveLocalSlamResults";
}
void OnRequest(const proto::ReceiveLocalSlamResultsRequest& request) override;
void OnFinish() override;

View File

@ -16,7 +16,7 @@
#include "cartographer/cloud/internal/handlers/run_final_optimization_handler.h"
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/internal/map_builder_context_interface.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "cartographer/common/make_unique.h"

View File

@ -17,7 +17,7 @@
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_RUN_FINAL_OPTIMIZATION_HANDLER_H
#define CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_RUN_FINAL_OPTIMIZATION_HANDLER_H
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "google/protobuf/empty.pb.h"
@ -25,13 +25,14 @@ namespace cartographer {
namespace cloud {
namespace handlers {
DEFINE_HANDLER_SIGNATURE(
RunFinalOptimizationSignature, google::protobuf::Empty,
google::protobuf::Empty,
"/cartographer.cloud.proto.MapBuilderService/RunFinalOptimization")
class RunFinalOptimizationHandler
: public framework::RpcHandler<google::protobuf::Empty,
google::protobuf::Empty> {
: public async_grpc::RpcHandler<RunFinalOptimizationSignature> {
public:
std::string method_name() const override {
return "/cartographer.cloud.proto.MapBuilderService/RunFinalOptimization";
}
void OnRequest(const google::protobuf::Empty& request) override;
};

View File

@ -16,7 +16,7 @@
#include "cartographer/cloud/internal/handlers/write_state_handler.h"
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/internal/map_builder_context_interface.h"
#include "cartographer/cloud/internal/map_builder_server.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"

View File

@ -17,7 +17,7 @@
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_WRITE_STATE_HANDLER_H
#define CARTOGRAPHER_CLOUD_INTERNAL_HANDLERS_WRITE_STATE_HANDLER_H
#include "cartographer/cloud/internal/framework/rpc_handler.h"
#include "async_grpc/rpc_handler.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "google/protobuf/empty.pb.h"
@ -25,13 +25,13 @@ namespace cartographer {
namespace cloud {
namespace handlers {
class WriteStateHandler : public framework::RpcHandler<
google::protobuf::Empty,
framework::Stream<proto::WriteStateResponse>> {
DEFINE_HANDLER_SIGNATURE(
WriteStateSignature, google::protobuf::Empty,
async_grpc::Stream<proto::WriteStateResponse>,
"/cartographer.cloud.proto.MapBuilderService/WriteState")
class WriteStateHandler : public async_grpc::RpcHandler<WriteStateSignature> {
public:
std::string method_name() const override {
return "/cartographer.cloud.proto.MapBuilderService/WriteState";
}
void OnRequest(const google::protobuf::Empty& request) override;
};

View File

@ -19,7 +19,7 @@
#include <map>
#include <thread>
#include "cartographer/cloud/internal/framework/client.h"
#include "async_grpc/client.h"
#include "cartographer/cloud/internal/handlers/add_fixed_frame_pose_data_handler.h"
#include "cartographer/cloud/internal/handlers/add_imu_data_handler.h"
#include "cartographer/cloud/internal/handlers/add_landmark_data_handler.h"
@ -29,6 +29,7 @@
#include "cartographer/cloud/internal/handlers/finish_trajectory_handler.h"
#include "cartographer/cloud/internal/sensor/serialization.h"
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "cartographer/common/blocking_queue.h"
#include "cartographer/common/make_unique.h"
#include "glog/logging.h"
#include "grpc++/grpc++.h"
@ -83,38 +84,38 @@ class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface {
common::BlockingQueue<std::unique_ptr<google::protobuf::Message>> send_queue_;
bool shutting_down_ = false;
std::unique_ptr<std::thread> upload_thread_;
std::unique_ptr<framework::Client<handlers::AddFixedFramePoseDataHandler>>
std::unique_ptr<async_grpc::Client<handlers::AddFixedFramePoseDataSignature>>
add_fixed_frame_pose_client_;
std::unique_ptr<framework::Client<handlers::AddImuDataHandler>>
std::unique_ptr<async_grpc::Client<handlers::AddImuDataSignature>>
add_imu_client_;
std::unique_ptr<framework::Client<handlers::AddLocalSlamResultDataHandler>>
std::unique_ptr<async_grpc::Client<handlers::AddLocalSlamResultDataSignature>>
add_local_slam_result_client_;
std::unique_ptr<framework::Client<handlers::AddOdometryDataHandler>>
std::unique_ptr<async_grpc::Client<handlers::AddOdometryDataSignature>>
add_odometry_client_;
std::unique_ptr<framework::Client<handlers::AddLandmarkDataHandler>>
std::unique_ptr<async_grpc::Client<handlers::AddLandmarkDataSignature>>
add_landmark_client_;
};
LocalTrajectoryUploader::~LocalTrajectoryUploader() {
if (add_imu_client_) {
CHECK(add_imu_client_->WritesDone());
CHECK(add_imu_client_->Finish().ok());
CHECK(add_imu_client_->StreamWritesDone());
CHECK(add_imu_client_->StreamFinish().ok());
}
if (add_odometry_client_) {
CHECK(add_odometry_client_->WritesDone());
CHECK(add_odometry_client_->Finish().ok());
CHECK(add_odometry_client_->StreamWritesDone());
CHECK(add_odometry_client_->StreamFinish().ok());
}
if (add_fixed_frame_pose_client_) {
CHECK(add_fixed_frame_pose_client_->WritesDone());
CHECK(add_fixed_frame_pose_client_->Finish().ok());
CHECK(add_fixed_frame_pose_client_->StreamWritesDone());
CHECK(add_fixed_frame_pose_client_->StreamFinish().ok());
}
if (add_local_slam_result_client_) {
CHECK(add_local_slam_result_client_->WritesDone());
CHECK(add_local_slam_result_client_->Finish().ok());
CHECK(add_local_slam_result_client_->StreamWritesDone());
CHECK(add_local_slam_result_client_->StreamFinish().ok());
}
if (add_landmark_client_) {
CHECK(add_landmark_client_->WritesDone());
CHECK(add_landmark_client_->Finish().ok());
CHECK(add_landmark_client_->StreamWritesDone());
CHECK(add_landmark_client_->StreamFinish().ok());
}
}
@ -173,9 +174,9 @@ void LocalTrajectoryUploader::TranslateTrajectoryId(
void LocalTrajectoryUploader::ProcessFixedFramePoseDataMessage(
proto::AddFixedFramePoseDataRequest *data_request) {
if (!add_fixed_frame_pose_client_) {
add_fixed_frame_pose_client_ =
make_unique<framework::Client<handlers::AddFixedFramePoseDataHandler>>(
client_channel_);
add_fixed_frame_pose_client_ = make_unique<
async_grpc::Client<handlers::AddFixedFramePoseDataSignature>>(
client_channel_);
}
TranslateTrajectoryId(data_request->mutable_sensor_metadata());
CHECK(add_fixed_frame_pose_client_->Write(*data_request));
@ -185,7 +186,7 @@ void LocalTrajectoryUploader::ProcessImuDataMessage(
proto::AddImuDataRequest *data_request) {
if (!add_imu_client_) {
add_imu_client_ =
make_unique<framework::Client<handlers::AddImuDataHandler>>(
make_unique<async_grpc::Client<handlers::AddImuDataSignature>>(
client_channel_);
}
TranslateTrajectoryId(data_request->mutable_sensor_metadata());
@ -196,7 +197,7 @@ void LocalTrajectoryUploader::ProcessOdometryDataMessage(
proto::AddOdometryDataRequest *data_request) {
if (!add_odometry_client_) {
add_odometry_client_ =
make_unique<framework::Client<handlers::AddOdometryDataHandler>>(
make_unique<async_grpc::Client<handlers::AddOdometryDataSignature>>(
client_channel_);
}
TranslateTrajectoryId(data_request->mutable_sensor_metadata());
@ -207,7 +208,7 @@ void LocalTrajectoryUploader::ProcessLandmarkDataMessage(
proto::AddLandmarkDataRequest *data_request) {
if (!add_landmark_client_) {
add_landmark_client_ =
make_unique<framework::Client<handlers::AddLandmarkDataHandler>>(
make_unique<async_grpc::Client<handlers::AddLandmarkDataSignature>>(
client_channel_);
}
TranslateTrajectoryId(data_request->mutable_sensor_metadata());
@ -217,9 +218,9 @@ void LocalTrajectoryUploader::ProcessLandmarkDataMessage(
void LocalTrajectoryUploader::ProcessLocalSlamResultDataMessage(
proto::AddLocalSlamResultDataRequest *data_request) {
if (!add_local_slam_result_client_) {
add_local_slam_result_client_ =
make_unique<framework::Client<handlers::AddLocalSlamResultDataHandler>>(
client_channel_);
add_local_slam_result_client_ = make_unique<
async_grpc::Client<handlers::AddLocalSlamResultDataSignature>>(
client_channel_);
}
TranslateTrajectoryId(data_request->mutable_sensor_metadata());
// A submap also holds a trajectory id that must be translated to uplink's
@ -245,7 +246,7 @@ void LocalTrajectoryUploader::AddTrajectory(
}
*request.add_expected_sensor_ids() =
cloud::ToProto(GetLocalSlamResultSensorId(local_trajectory_id));
framework::Client<handlers::AddTrajectoryHandler> client(client_channel_);
async_grpc::Client<handlers::AddTrajectorySignature> client(client_channel_);
CHECK(client.Write(request));
CHECK_EQ(local_to_cloud_trajectory_id_map_.count(local_trajectory_id), 0);
local_to_cloud_trajectory_id_map_[local_trajectory_id] =
@ -258,7 +259,8 @@ void LocalTrajectoryUploader::FinishTrajectory(int local_trajectory_id) {
local_to_cloud_trajectory_id_map_[local_trajectory_id];
proto::FinishTrajectoryRequest request;
request.set_trajectory_id(cloud_trajectory_id);
framework::Client<handlers::FinishTrajectoryHandler> client(client_channel_);
async_grpc::Client<handlers::FinishTrajectorySignature> client(
client_channel_);
CHECK(client.Write(request));
}

View File

@ -17,7 +17,7 @@
#ifndef CARTOGRAPHER_CLOUD_MAP_BUILDER_CONTEXT_INTERFACE_H
#define CARTOGRAPHER_CLOUD_MAP_BUILDER_CONTEXT_INTERFACE_H
#include "cartographer/cloud/internal/framework/execution_context.h"
#include "async_grpc/execution_context.h"
#include "cartographer/cloud/internal/local_trajectory_uploader.h"
#include "cartographer/common/blocking_queue.h"
#include "cartographer/mapping/map_builder_interface.h"
@ -29,7 +29,7 @@ namespace cartographer {
namespace cloud {
class MapBuilderServer;
class MapBuilderContextInterface : public framework::ExecutionContext {
class MapBuilderContextInterface : public async_grpc::ExecutionContext {
public:
struct LocalSlamResult {
int trajectory_id;

View File

@ -49,7 +49,7 @@ MapBuilderServer::MapBuilderServer(
const proto::MapBuilderServerOptions& map_builder_server_options,
std::unique_ptr<mapping::MapBuilderInterface> map_builder)
: map_builder_(std::move(map_builder)) {
framework::Server::Builder server_builder;
async_grpc::Server::Builder server_builder;
server_builder.SetServerAddress(map_builder_server_options.server_address());
server_builder.SetNumGrpcThreads(
map_builder_server_options.num_grpc_threads());

View File

@ -17,8 +17,8 @@
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_MAP_BUILDER_SERVER_H
#define CARTOGRAPHER_CLOUD_INTERNAL_MAP_BUILDER_SERVER_H
#include "cartographer/cloud/internal/framework/execution_context.h"
#include "cartographer/cloud/internal/framework/server.h"
#include "async_grpc/execution_context.h"
#include "async_grpc/server.h"
#include "cartographer/cloud/internal/local_trajectory_uploader.h"
#include "cartographer/cloud/internal/map_builder_context.h"
#include "cartographer/cloud/map_builder_server_interface.h"
@ -79,7 +79,7 @@ class MapBuilderServer : public MapBuilderServerInterface {
bool shutting_down_ = false;
std::unique_ptr<std::thread> slam_thread_;
std::unique_ptr<framework::Server> grpc_server_;
std::unique_ptr<async_grpc::Server> grpc_server_;
std::unique_ptr<mapping::MapBuilderInterface> map_builder_;
common::BlockingQueue<std::unique_ptr<MapBuilderContextInterface::Data>>
incoming_data_queue_;

View File

@ -17,7 +17,7 @@
#ifndef CARTOGRAPHER_CLOUD_INTERNAL_TESTING_HANDLER_TEST_H
#define CARTOGRAPHER_CLOUD_INTERNAL_TESTING_HANDLER_TEST_H
#include "cartographer/cloud/internal/framework/testing/rpc_handler_test_server.h"
#include "async_grpc/testing/rpc_handler_test_server.h"
#include "cartographer/common/make_unique.h"
#include "cartographer/mapping/internal/testing/mock_map_builder.h"
#include "cartographer/mapping/internal/testing/mock_pose_graph.h"
@ -32,12 +32,12 @@ namespace testing {
using ::testing::Return;
using ::testing::Test;
template <typename HandlerType>
template <typename HandlerConcept, typename HandlerType>
class HandlerTest : public Test {
public:
void SetUp() override {
test_server_ = common::make_unique<
framework::testing::RpcHandlerTestServer<HandlerType>>(
async_grpc::testing::RpcHandlerTestServer<HandlerConcept, HandlerType>>(
common::make_unique<MockMapBuilderContext>());
mock_map_builder_context_ =
test_server_
@ -66,9 +66,10 @@ class HandlerTest : public Test {
}
protected:
std::unique_ptr<framework::testing::RpcHandlerTestServer<HandlerType>>
std::unique_ptr<
async_grpc::testing::RpcHandlerTestServer<HandlerConcept, HandlerType>>
test_server_;
MockMapBuilderContext* mock_map_builder_context_;
MockMapBuilderContext *mock_map_builder_context_;
std::unique_ptr<MockLocalTrajectoryUploader> mock_local_trajectory_uploader_;
std::unique_ptr<mapping::testing::MockMapBuilder> mock_map_builder_;
std::unique_ptr<mapping::testing::MockPoseGraph> mock_pose_graph_;

29
scripts/install_async_grpc.sh Executable file
View File

@ -0,0 +1,29 @@
#!/bin/sh
# Copyright 2018 The Cartographer Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -o errexit
set -o verbose
git clone https://github.com/googlecartographer/async_grpc
cd async_grpc
git checkout 654c75ebf553c2bdb624c87a690f5a238aeb651f
mkdir build
cd build
cmake -G Ninja \
-DCMAKE_BUILD_TYPE=Release \
..
ninja
sudo ninja install

View File

@ -20,4 +20,4 @@ set -o verbose
# Build and install Cartographer.
cd cartographer
bazel build //...
bazel test //...
bazel test --jobs 1 //...