From 915ebb19a68bfb1b1ff4e8fe8e972db9e5e32000 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20Sch=C3=BCtte?= Date: Fri, 23 Mar 2018 10:52:48 +0100 Subject: [PATCH] Use async_grpc (#1013) Use async_grpc --- CMakeLists.txt | 7 + Dockerfile.trusty | 2 + bazel/repositories.bzl | 8 + cartographer/BUILD.bazel | 1 + cartographer/cloud/client/map_builder_stub.cc | 19 +- .../cloud/internal/client/pose_graph_stub.cc | 16 +- .../client/trajectory_builder_stub.cc | 38 +- .../internal/client/trajectory_builder_stub.h | 16 +- .../cloud/internal/framework/client.h | 205 ---------- .../framework/completion_queue_thread.cc | 48 --- .../framework/completion_queue_thread.h | 50 --- .../internal/framework/event_queue_thread.cc | 46 --- .../internal/framework/event_queue_thread.h | 50 --- .../internal/framework/execution_context.h | 66 --- .../framework/proto/math_service.proto | 58 --- .../cloud/internal/framework/retry.cc | 92 ----- cartographer/cloud/internal/framework/retry.h | 53 --- cartographer/cloud/internal/framework/rpc.cc | 383 ------------------ cartographer/cloud/internal/framework/rpc.h | 213 ---------- .../cloud/internal/framework/rpc_handler.h | 93 ----- .../framework/rpc_handler_interface.h | 66 --- .../cloud/internal/framework/server.cc | 193 --------- .../cloud/internal/framework/server.h | 200 --------- .../cloud/internal/framework/server_test.cc | 265 ------------ .../cloud/internal/framework/service.cc | 151 ------- .../cloud/internal/framework/service.h | 67 --- .../testing/rpc_handler_test_server.h | 141 ------- .../framework/testing/rpc_handler_wrapper.h | 60 --- .../cloud/internal/framework/type_traits.h | 71 ---- .../internal/framework/type_traits_test.cc | 45 -- .../add_fixed_frame_pose_data_handler.cc | 2 +- .../add_fixed_frame_pose_data_handler.h | 15 +- .../add_fixed_frame_pose_data_handler_test.cc | 3 +- .../internal/handlers/add_imu_data_handler.cc | 2 +- .../internal/handlers/add_imu_data_handler.h | 14 +- .../handlers/add_imu_data_handler_test.cc | 3 +- .../handlers/add_landmark_data_handler.cc | 2 +- .../handlers/add_landmark_data_handler.h | 14 +- .../add_landmark_data_handler_test.cc | 3 +- .../add_local_slam_result_data_handler.cc | 2 +- .../add_local_slam_result_data_handler.h | 15 +- .../handlers/add_odometry_data_handler.cc | 2 +- .../handlers/add_odometry_data_handler.h | 14 +- .../add_odometry_data_handler_test.cc | 3 +- .../handlers/add_rangefinder_data_handler.cc | 2 +- .../handlers/add_rangefinder_data_handler.h | 15 +- .../add_rangefinder_data_handler_test.cc | 3 +- .../handlers/add_trajectory_handler.cc | 2 +- .../handlers/add_trajectory_handler.h | 13 +- .../handlers/add_trajectory_handler_test.cc | 5 +- .../handlers/finish_trajectory_handler.cc | 2 +- .../handlers/finish_trajectory_handler.h | 13 +- .../internal/handlers/get_all_submap_poses.cc | 2 +- .../internal/handlers/get_all_submap_poses.h | 13 +- .../handlers/get_constraints_handler.cc | 2 +- .../handlers/get_constraints_handler.h | 13 +- .../handlers/get_landmark_poses_handler.cc | 2 +- .../handlers/get_landmark_poses_handler.h | 13 +- .../get_landmark_poses_handler_test.cc | 2 +- .../get_local_to_global_transform_handler.cc | 2 +- .../get_local_to_global_transform_handler.h | 14 +- .../internal/handlers/get_submap_handler.cc | 2 +- .../internal/handlers/get_submap_handler.h | 13 +- .../get_trajectory_node_poses_handler.cc | 2 +- .../get_trajectory_node_poses_handler.h | 13 +- .../internal/handlers/load_state_handler.cc | 2 +- .../internal/handlers/load_state_handler.h | 14 +- .../receive_local_slam_results_handler.cc | 2 +- .../receive_local_slam_results_handler.h | 15 +- .../run_final_optimization_handler.cc | 2 +- .../handlers/run_final_optimization_handler.h | 13 +- .../internal/handlers/write_state_handler.cc | 2 +- .../internal/handlers/write_state_handler.h | 14 +- .../internal/local_trajectory_uploader.cc | 56 +-- .../internal/map_builder_context_interface.h | 4 +- .../cloud/internal/map_builder_server.cc | 2 +- .../cloud/internal/map_builder_server.h | 6 +- .../cloud/internal/testing/handler_test.h | 11 +- scripts/install_async_grpc.sh | 29 ++ scripts/install_cartographer_bazel.sh | 2 +- 80 files changed, 296 insertions(+), 2843 deletions(-) delete mode 100644 cartographer/cloud/internal/framework/client.h delete mode 100644 cartographer/cloud/internal/framework/completion_queue_thread.cc delete mode 100644 cartographer/cloud/internal/framework/completion_queue_thread.h delete mode 100644 cartographer/cloud/internal/framework/event_queue_thread.cc delete mode 100644 cartographer/cloud/internal/framework/event_queue_thread.h delete mode 100644 cartographer/cloud/internal/framework/execution_context.h delete mode 100644 cartographer/cloud/internal/framework/proto/math_service.proto delete mode 100644 cartographer/cloud/internal/framework/retry.cc delete mode 100644 cartographer/cloud/internal/framework/retry.h delete mode 100644 cartographer/cloud/internal/framework/rpc.cc delete mode 100644 cartographer/cloud/internal/framework/rpc.h delete mode 100644 cartographer/cloud/internal/framework/rpc_handler.h delete mode 100644 cartographer/cloud/internal/framework/rpc_handler_interface.h delete mode 100644 cartographer/cloud/internal/framework/server.cc delete mode 100644 cartographer/cloud/internal/framework/server.h delete mode 100644 cartographer/cloud/internal/framework/server_test.cc delete mode 100644 cartographer/cloud/internal/framework/service.cc delete mode 100644 cartographer/cloud/internal/framework/service.h delete mode 100644 cartographer/cloud/internal/framework/testing/rpc_handler_test_server.h delete mode 100644 cartographer/cloud/internal/framework/testing/rpc_handler_wrapper.h delete mode 100644 cartographer/cloud/internal/framework/type_traits.h delete mode 100644 cartographer/cloud/internal/framework/type_traits_test.cc create mode 100755 scripts/install_async_grpc.sh diff --git a/CMakeLists.txt b/CMakeLists.txt index cbddde8..b47b40f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,6 +34,10 @@ find_package(Eigen3 REQUIRED) find_package(LuaGoogle REQUIRED) find_package(Protobuf 3.0.0 REQUIRED) +if (${BUILD_GRPC}) + find_package(async_grpc REQUIRED) +endif() + include(FindPkgConfig) PKG_SEARCH_MODULE(CAIRO REQUIRED cairo>=1.12.16) @@ -172,6 +176,7 @@ if(${BUILD_GRPC}) cartographer/cloud/map_builder_server_main.cc ) target_link_libraries(cartographer_grpc_server PUBLIC grpc++) + target_link_libraries(cartographer_grpc_server PUBLIC async_grpc) if(${BUILD_PROMETHEUS}) target_link_libraries(cartographer_grpc_server PUBLIC prometheus-cpp) endif() @@ -186,6 +191,7 @@ foreach(ABS_FIL ${ALL_TESTS}) google_test("${TEST_TARGET_NAME}" ${ABS_FIL}) if(${BUILD_GRPC}) target_link_libraries("${TEST_TARGET_NAME}" PUBLIC grpc++) + target_link_libraries("${TEST_TARGET_NAME}" PUBLIC async_grpc) endif() if(${BUILD_PROMETHEUS}) target_link_libraries(${PROJECT_NAME} PUBLIC prometheus-cpp) @@ -223,6 +229,7 @@ target_include_directories(${PROJECT_NAME} SYSTEM PUBLIC target_link_libraries(${PROJECT_NAME} PUBLIC ${PROTOBUF_LIBRARY} pthread) if(${BUILD_GRPC}) target_link_libraries(${PROJECT_NAME} PUBLIC grpc++) + target_link_libraries(${PROJECT_NAME} PUBLIC async_grpc) endif() if(${BUILD_PROMETHEUS}) target_link_libraries(${PROJECT_NAME} PUBLIC prometheus-cpp) diff --git a/Dockerfile.trusty b/Dockerfile.trusty index afd68bd..4c8c799 100644 --- a/Dockerfile.trusty +++ b/Dockerfile.trusty @@ -21,6 +21,8 @@ COPY scripts/install_proto3.sh cartographer/scripts/ RUN cartographer/scripts/install_proto3.sh && rm -rf protobuf COPY scripts/install_grpc.sh cartographer/scripts/ RUN cartographer/scripts/install_grpc.sh && rm -rf grpc +COPY scripts/install_async_grpc.sh cartographer/scripts/ +RUN cartographer/scripts/install_async_grpc.sh && rm -rf async_grpc COPY scripts/install_prometheus_cpp.sh cartographer/scripts/ RUN cartographer/scripts/install_prometheus_cpp.sh && rm -rf prometheus-cpp COPY . cartographer diff --git a/bazel/repositories.bzl b/bazel/repositories.bzl index 223ad53..b9477d4 100644 --- a/bazel/repositories.bzl +++ b/bazel/repositories.bzl @@ -226,6 +226,14 @@ def cartographer_repositories(): ], ) + _maybe(native.http_archive, + name = "com_github_googlecartographer_async_grpc", + strip_prefix = "async_grpc-654c75ebf553c2bdb624c87a690f5a238aeb651f", + urls = [ + "https://github.com/googlecartographer/async_grpc/archive/654c75ebf553c2bdb624c87a690f5a238aeb651f.tar.gz", + ], + ) + # TODO(rodrigoq): remove these binds once grpc#14140 has been merged, as well # as removing `use_external` in cartographer_grpc/BUILD.bazel. # https://github.com/grpc/grpc/pull/14140 diff --git a/cartographer/BUILD.bazel b/cartographer/BUILD.bazel index 23c0f98..e4ac8cd 100644 --- a/cartographer/BUILD.bazel +++ b/cartographer/BUILD.bazel @@ -70,6 +70,7 @@ cc_library( ":cc_protos", "@boost//:iostreams", "@com_google_glog//:glog", + "@com_github_googlecartographer_async_grpc//async_grpc:async_grpc", "@org_cairographics_cairo//:cairo", "@org_ceres_solver_ceres_solver//:ceres", "@org_lua_lua//:lua", diff --git a/cartographer/cloud/client/map_builder_stub.cc b/cartographer/cloud/client/map_builder_stub.cc index f26922c..73c82dc 100644 --- a/cartographer/cloud/client/map_builder_stub.cc +++ b/cartographer/cloud/client/map_builder_stub.cc @@ -49,8 +49,8 @@ int MapBuilderStub::AddTrajectoryBuilder( for (const auto& sensor_id : expected_sensor_ids) { *request.add_expected_sensor_ids() = cloud::ToProto(sensor_id); } - framework::Client client( - client_channel_, framework::CreateLimitedBackoffStrategy( + async_grpc::Client client( + client_channel_, async_grpc::CreateLimitedBackoffStrategy( common::FromMilliseconds(100), 2.f, 5)); CHECK(client.Write(request)); @@ -78,7 +78,8 @@ mapping::TrajectoryBuilderInterface* MapBuilderStub::GetTrajectoryBuilder( void MapBuilderStub::FinishTrajectory(int trajectory_id) { proto::FinishTrajectoryRequest request; request.set_trajectory_id(trajectory_id); - framework::Client client(client_channel_); + async_grpc::Client client( + client_channel_); CHECK(client.Write(request)); trajectory_builder_stubs_.erase(trajectory_id); } @@ -88,7 +89,7 @@ std::string MapBuilderStub::SubmapToProto( mapping::proto::SubmapQuery::Response* submap_query_response) { proto::GetSubmapRequest request; submap_id.ToProto(request.mutable_submap_id()); - framework::Client client(client_channel_); + async_grpc::Client client(client_channel_); CHECK(client.Write(request)); submap_query_response->CopyFrom(client.response().submap_query_response()); return client.response().error_msg(); @@ -96,10 +97,10 @@ std::string MapBuilderStub::SubmapToProto( void MapBuilderStub::SerializeState(io::ProtoStreamWriterInterface* writer) { google::protobuf::Empty request; - framework::Client client(client_channel_); + async_grpc::Client client(client_channel_); CHECK(client.Write(request)); proto::WriteStateResponse response; - while (client.Read(&response)) { + while (client.StreamRead(&response)) { switch (response.state_chunk_case()) { case proto::WriteStateResponse::kPoseGraph: writer->WriteProto(response.pose_graph()); @@ -121,7 +122,7 @@ void MapBuilderStub::LoadState(io::ProtoStreamReaderInterface* reader, if (!load_frozen_state) { LOG(FATAL) << "Not implemented"; } - framework::Client client(client_channel_); + async_grpc::Client client(client_channel_); // Request with a PoseGraph proto is sent first. { proto::LoadStateRequest request; @@ -141,8 +142,8 @@ void MapBuilderStub::LoadState(io::ProtoStreamReaderInterface* reader, } CHECK(reader->eof()); - CHECK(client.WritesDone()); - CHECK(client.Finish().ok()); + CHECK(client.StreamWritesDone()); + CHECK(client.StreamFinish().ok()); } int MapBuilderStub::num_trajectory_builders() const { diff --git a/cartographer/cloud/internal/client/pose_graph_stub.cc b/cartographer/cloud/internal/client/pose_graph_stub.cc index 86f1dd8..bbbce1a 100644 --- a/cartographer/cloud/internal/client/pose_graph_stub.cc +++ b/cartographer/cloud/internal/client/pose_graph_stub.cc @@ -15,7 +15,7 @@ */ #include "cartographer/cloud/internal/client/pose_graph_stub.h" -#include "cartographer/cloud/internal/framework/client.h" +#include "async_grpc/client.h" #include "cartographer/cloud/internal/handlers/get_all_submap_poses.h" #include "cartographer/cloud/internal/handlers/get_constraints_handler.h" #include "cartographer/cloud/internal/handlers/get_landmark_poses_handler.h" @@ -34,7 +34,7 @@ PoseGraphStub::PoseGraphStub(std::shared_ptr<::grpc::Channel> client_channel) void PoseGraphStub::RunFinalOptimization() { google::protobuf::Empty request; - framework::Client client( + async_grpc::Client client( client_channel_); CHECK(client.Write(request)); } @@ -47,7 +47,8 @@ PoseGraphStub::GetAllSubmapData() { mapping::MapById PoseGraphStub::GetAllSubmapPoses() { google::protobuf::Empty request; - framework::Client client(client_channel_); + async_grpc::Client client( + client_channel_); CHECK(client.Write(request)); mapping::MapById submap_poses; @@ -65,7 +66,7 @@ PoseGraphStub::GetAllSubmapPoses() { transform::Rigid3d PoseGraphStub::GetLocalToGlobalTransform(int trajectory_id) { proto::GetLocalToGlobalTransformRequest request; request.set_trajectory_id(trajectory_id); - framework::Client client( + async_grpc::Client client( client_channel_); CHECK(client.Write(request)); return transform::ToRigid3(client.response().local_to_global()); @@ -79,7 +80,7 @@ PoseGraphStub::GetTrajectoryNodes() { mapping::MapById PoseGraphStub::GetTrajectoryNodePoses() { google::protobuf::Empty request; - framework::Client client( + async_grpc::Client client( client_channel_); CHECK(client.Write(request)); mapping::MapById node_poses; @@ -95,7 +96,8 @@ PoseGraphStub::GetTrajectoryNodePoses() { std::map PoseGraphStub::GetLandmarkPoses() { google::protobuf::Empty request; - framework::Client client(client_channel_); + async_grpc::Client client( + client_channel_); CHECK(client.Write(request)); std::map landmark_poses; for (const auto& landmark_pose : client.response().landmark_poses()) { @@ -121,7 +123,7 @@ PoseGraphStub::GetTrajectoryData() { std::vector PoseGraphStub::constraints() { google::protobuf::Empty request; - framework::Client client(client_channel_); + async_grpc::Client client(client_channel_); CHECK(client.Write(request)); return mapping::FromProto(client.response().constraints()); } diff --git a/cartographer/cloud/internal/client/trajectory_builder_stub.cc b/cartographer/cloud/internal/client/trajectory_builder_stub.cc index 952149b..efc7342 100644 --- a/cartographer/cloud/internal/client/trajectory_builder_stub.cc +++ b/cartographer/cloud/internal/client/trajectory_builder_stub.cc @@ -50,24 +50,24 @@ TrajectoryBuilderStub::~TrajectoryBuilderStub() { receive_local_slam_results_thread_->join(); } if (add_rangefinder_client_) { - CHECK(add_rangefinder_client_->WritesDone()); - CHECK(add_rangefinder_client_->Finish().ok()); + CHECK(add_rangefinder_client_->StreamWritesDone()); + CHECK(add_rangefinder_client_->StreamFinish().ok()); } if (add_imu_client_) { - CHECK(add_imu_client_->WritesDone()); - CHECK(add_imu_client_->Finish().ok()); + CHECK(add_imu_client_->StreamWritesDone()); + CHECK(add_imu_client_->StreamFinish().ok()); } if (add_odometry_client_) { - CHECK(add_odometry_client_->WritesDone()); - CHECK(add_odometry_client_->Finish().ok()); + CHECK(add_odometry_client_->StreamWritesDone()); + CHECK(add_odometry_client_->StreamFinish().ok()); } if (add_landmark_client_) { - CHECK(add_landmark_client_->WritesDone()); - CHECK(add_landmark_client_->Finish().ok()); + CHECK(add_landmark_client_->StreamWritesDone()); + CHECK(add_landmark_client_->StreamFinish().ok()); } if (add_fixed_frame_pose_client_) { - CHECK(add_fixed_frame_pose_client_->WritesDone()); - CHECK(add_fixed_frame_pose_client_->Finish().ok()); + CHECK(add_fixed_frame_pose_client_->StreamWritesDone()); + CHECK(add_fixed_frame_pose_client_->StreamFinish().ok()); } } @@ -76,7 +76,7 @@ void TrajectoryBuilderStub::AddSensorData( const sensor::TimedPointCloudData& timed_point_cloud_data) { if (!add_rangefinder_client_) { add_rangefinder_client_ = common::make_unique< - framework::Client>( + async_grpc::Client>( client_channel_); } proto::AddRangefinderDataRequest request; @@ -90,7 +90,7 @@ void TrajectoryBuilderStub::AddSensorData(const std::string& sensor_id, const sensor::ImuData& imu_data) { if (!add_imu_client_) { add_imu_client_ = - common::make_unique>( + common::make_unique>( client_channel_); } proto::AddImuDataRequest request; @@ -103,7 +103,8 @@ void TrajectoryBuilderStub::AddSensorData( const std::string& sensor_id, const sensor::OdometryData& odometry_data) { if (!add_odometry_client_) { add_odometry_client_ = common::make_unique< - framework::Client>(client_channel_); + async_grpc::Client>( + client_channel_); } proto::AddOdometryDataRequest request; CreateAddOdometryDataRequest(sensor_id, trajectory_id_, @@ -116,7 +117,7 @@ void TrajectoryBuilderStub::AddSensorData( const sensor::FixedFramePoseData& fixed_frame_pose) { if (!add_fixed_frame_pose_client_) { add_fixed_frame_pose_client_ = common::make_unique< - framework::Client>( + async_grpc::Client>( client_channel_); } proto::AddFixedFramePoseDataRequest request; @@ -129,7 +130,8 @@ void TrajectoryBuilderStub::AddSensorData( const std::string& sensor_id, const sensor::LandmarkData& landmark_data) { if (!add_landmark_client_) { add_landmark_client_ = common::make_unique< - framework::Client>(client_channel_); + async_grpc::Client>( + client_channel_); } proto::AddLandmarkDataRequest request; CreateAddLandmarkDataRequest(sensor_id, trajectory_id_, @@ -143,10 +145,10 @@ void TrajectoryBuilderStub::AddLocalSlamResultData( } void TrajectoryBuilderStub::RunLocalSlamResultsReader( - framework::Client* client, + async_grpc::Client* client, LocalSlamResultCallback local_slam_result_callback) { proto::ReceiveLocalSlamResultsResponse response; - while (client->Read(&response)) { + while (client->StreamRead(&response)) { int trajectory_id = response.trajectory_id(); common::Time time = common::FromUniversal(response.timestamp()); transform::Rigid3d local_pose = transform::ToRigid3(response.local_pose()); @@ -161,7 +163,7 @@ void TrajectoryBuilderStub::RunLocalSlamResultsReader( local_slam_result_callback(trajectory_id, time, local_pose, range_data, std::move(insertion_result)); } - client->Finish(); + client->StreamFinish(); } } // namespace cloud diff --git a/cartographer/cloud/internal/client/trajectory_builder_stub.h b/cartographer/cloud/internal/client/trajectory_builder_stub.h index 7382fb6..9190bf6 100644 --- a/cartographer/cloud/internal/client/trajectory_builder_stub.h +++ b/cartographer/cloud/internal/client/trajectory_builder_stub.h @@ -19,7 +19,7 @@ #include -#include "cartographer/cloud/internal/framework/client.h" +#include "async_grpc/client.h" #include "cartographer/cloud/internal/handlers/add_fixed_frame_pose_data_handler.h" #include "cartographer/cloud/internal/handlers/add_imu_data_handler.h" #include "cartographer/cloud/internal/handlers/add_landmark_data_handler.h" @@ -62,23 +62,23 @@ class TrajectoryBuilderStub : public mapping::TrajectoryBuilderInterface { private: static void RunLocalSlamResultsReader( - framework::Client* + async_grpc::Client* client_reader, LocalSlamResultCallback local_slam_result_callback); std::shared_ptr<::grpc::Channel> client_channel_; const int trajectory_id_; - std::unique_ptr> + std::unique_ptr> add_rangefinder_client_; - std::unique_ptr> + std::unique_ptr> add_imu_client_; - std::unique_ptr> + std::unique_ptr> add_odometry_client_; - std::unique_ptr> + std::unique_ptr> add_fixed_frame_pose_client_; - std::unique_ptr> + std::unique_ptr> add_landmark_client_; - framework::Client + async_grpc::Client receive_local_slam_results_client_; std::unique_ptr receive_local_slam_results_thread_; }; diff --git a/cartographer/cloud/internal/framework/client.h b/cartographer/cloud/internal/framework/client.h deleted file mode 100644 index 31e313b..0000000 --- a/cartographer/cloud/internal/framework/client.h +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Copyright 2018 The Cartographer Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_CLIENT_H -#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_CLIENT_H - -#include "cartographer/cloud/internal/framework/retry.h" -#include "cartographer/cloud/internal/framework/rpc_handler_interface.h" -#include "cartographer/cloud/internal/framework/type_traits.h" -#include "glog/logging.h" -#include "grpc++/grpc++.h" -#include "grpc++/impl/codegen/client_unary_call.h" -#include "grpc++/impl/codegen/sync_stream.h" - -namespace cartographer { -namespace cloud { -namespace framework { - -template -class Client { - public: - Client(std::shared_ptr<::grpc::Channel> channel, RetryStrategy retry_strategy) - : channel_(channel), - client_context_(common::make_unique<::grpc::ClientContext>()), - rpc_method_name_( - RpcHandlerInterface::Instantiate()->method_name()), - rpc_method_(rpc_method_name_.c_str(), - RpcType::value, - channel_), - retry_strategy_(retry_strategy) { - CHECK(!retry_strategy || - rpc_method_.method_type() == ::grpc::internal::RpcMethod::NORMAL_RPC) - << "Retry is currently only support for NORMAL_RPC."; - } - - Client(std::shared_ptr<::grpc::Channel> channel) - : channel_(channel), - client_context_(common::make_unique<::grpc::ClientContext>()), - rpc_method_name_( - RpcHandlerInterface::Instantiate()->method_name()), - rpc_method_(rpc_method_name_.c_str(), - RpcType::value, - channel_) {} - - bool Read(typename RpcHandlerType::ResponseType *response) { - switch (rpc_method_.method_type()) { - case ::grpc::internal::RpcMethod::BIDI_STREAMING: - InstantiateClientReaderWriterIfNeeded(); - return client_reader_writer_->Read(response); - case ::grpc::internal::RpcMethod::SERVER_STREAMING: - CHECK(client_reader_); - return client_reader_->Read(response); - default: - LOG(FATAL) << "Not implemented."; - } - } - - bool Write(const typename RpcHandlerType::RequestType &request) { - return RetryWithStrategy( - retry_strategy_, - std::bind(&Client::WriteImpl, this, request), - std::bind(&Client::Reset, this)); - } - - bool WritesDone() { - switch (rpc_method_.method_type()) { - case ::grpc::internal::RpcMethod::CLIENT_STREAMING: - InstantiateClientWriterIfNeeded(); - return client_writer_->WritesDone(); - case ::grpc::internal::RpcMethod::BIDI_STREAMING: - InstantiateClientReaderWriterIfNeeded(); - return client_reader_writer_->WritesDone(); - default: - LOG(FATAL) << "Not implemented."; - } - } - - ::grpc::Status Finish() { - switch (rpc_method_.method_type()) { - case ::grpc::internal::RpcMethod::CLIENT_STREAMING: - InstantiateClientWriterIfNeeded(); - return client_writer_->Finish(); - case ::grpc::internal::RpcMethod::BIDI_STREAMING: - InstantiateClientReaderWriterIfNeeded(); - return client_reader_writer_->Finish(); - case ::grpc::internal::RpcMethod::SERVER_STREAMING: - CHECK(client_reader_); - return client_reader_->Finish(); - default: - LOG(FATAL) << "Not implemented."; - } - } - - const typename RpcHandlerType::ResponseType &response() { - CHECK(rpc_method_.method_type() == - ::grpc::internal::RpcMethod::NORMAL_RPC || - rpc_method_.method_type() == - ::grpc::internal::RpcMethod::CLIENT_STREAMING); - return response_; - } - - private: - void Reset() { - client_context_ = common::make_unique<::grpc::ClientContext>(); - } - - bool WriteImpl(const typename RpcHandlerType::RequestType &request) { - switch (rpc_method_.method_type()) { - case ::grpc::internal::RpcMethod::NORMAL_RPC: - return MakeBlockingUnaryCall(request, &response_).ok(); - case ::grpc::internal::RpcMethod::CLIENT_STREAMING: - InstantiateClientWriterIfNeeded(); - return client_writer_->Write(request); - case ::grpc::internal::RpcMethod::BIDI_STREAMING: - InstantiateClientReaderWriterIfNeeded(); - return client_reader_writer_->Write(request); - case ::grpc::internal::RpcMethod::SERVER_STREAMING: - InstantiateClientReader(request); - return true; - } - LOG(FATAL) << "Not reached."; - } - - void InstantiateClientWriterIfNeeded() { - CHECK_EQ(rpc_method_.method_type(), - ::grpc::internal::RpcMethod::CLIENT_STREAMING); - if (!client_writer_) { - client_writer_.reset( - ::grpc::internal:: - ClientWriterFactory::Create( - channel_.get(), rpc_method_, client_context_.get(), - &response_)); - } - } - - void InstantiateClientReaderWriterIfNeeded() { - CHECK_EQ(rpc_method_.method_type(), - ::grpc::internal::RpcMethod::BIDI_STREAMING); - if (!client_reader_writer_) { - client_reader_writer_.reset( - ::grpc::internal::ClientReaderWriterFactory< - typename RpcHandlerType::RequestType, - typename RpcHandlerType::ResponseType>::Create(channel_.get(), - rpc_method_, - client_context_ - .get())); - } - } - - void InstantiateClientReader( - const typename RpcHandlerType::RequestType &request) { - CHECK_EQ(rpc_method_.method_type(), - ::grpc::internal::RpcMethod::SERVER_STREAMING); - client_reader_.reset( - ::grpc::internal:: - ClientReaderFactory::Create( - channel_.get(), rpc_method_, client_context_.get(), request)); - } - - ::grpc::Status MakeBlockingUnaryCall( - const typename RpcHandlerType::RequestType &request, - typename RpcHandlerType::ResponseType *response) { - CHECK_EQ(rpc_method_.method_type(), - ::grpc::internal::RpcMethod::NORMAL_RPC); - return ::grpc::internal::BlockingUnaryCall( - channel_.get(), rpc_method_, client_context_.get(), request, response); - } - - std::shared_ptr<::grpc::Channel> channel_; - std::unique_ptr<::grpc::ClientContext> client_context_; - const std::string rpc_method_name_; - const ::grpc::internal::RpcMethod rpc_method_; - - std::unique_ptr<::grpc::ClientWriter> - client_writer_; - std::unique_ptr< - ::grpc::ClientReaderWriter> - client_reader_writer_; - std::unique_ptr<::grpc::ClientReader> - client_reader_; - typename RpcHandlerType::ResponseType response_; - RetryStrategy retry_strategy_; -}; - -} // namespace framework -} // namespace cloud -} // namespace cartographer - -#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_CLIENT_H diff --git a/cartographer/cloud/internal/framework/completion_queue_thread.cc b/cartographer/cloud/internal/framework/completion_queue_thread.cc deleted file mode 100644 index 776e14d..0000000 --- a/cartographer/cloud/internal/framework/completion_queue_thread.cc +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2017 The Cartographer Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "cartographer/cloud/internal/framework/completion_queue_thread.h" - -#include "cartographer/common/make_unique.h" -#include "glog/logging.h" - -namespace cartographer { -namespace cloud { -namespace framework { - -CompletionQueueThread::CompletionQueueThread( - std::unique_ptr<::grpc::ServerCompletionQueue> completion_queue) - : completion_queue_(std::move(completion_queue)) {} - -::grpc::ServerCompletionQueue* CompletionQueueThread::completion_queue() { - return completion_queue_.get(); -} - -void CompletionQueueThread::Start(CompletionQueueRunner runner) { - CHECK(!worker_thread_); - worker_thread_ = common::make_unique( - [this, runner]() { runner(this->completion_queue_.get()); }); -} - -void CompletionQueueThread::Shutdown() { - LOG(INFO) << "Shutting down completion queue " << completion_queue_.get(); - completion_queue_->Shutdown(); - worker_thread_->join(); -} - -} // namespace framework -} // namespace cloud -} // namespace cartographer diff --git a/cartographer/cloud/internal/framework/completion_queue_thread.h b/cartographer/cloud/internal/framework/completion_queue_thread.h deleted file mode 100644 index c27a530..0000000 --- a/cartographer/cloud/internal/framework/completion_queue_thread.h +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2017 The Cartographer Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef CARTOGRAPHER_INTERNAL_FRAMEWORK_COMPLETION_QUEUE_THREAD_H_ -#define CARTOGRAPHER_INTERNAL_FRAMEWORK_COMPLETION_QUEUE_THREAD_H_ - -#include -#include -#include - -namespace cartographer { -namespace cloud { -namespace framework { - -class CompletionQueueThread { - public: - using CompletionQueueRunner = - std::function; - - explicit CompletionQueueThread( - std::unique_ptr<::grpc::ServerCompletionQueue> completion_queue); - - ::grpc::ServerCompletionQueue* completion_queue(); - - void Start(CompletionQueueRunner runner); - void Shutdown(); - - private: - std::unique_ptr<::grpc::ServerCompletionQueue> completion_queue_; - std::unique_ptr worker_thread_; -}; - -} // namespace framework -} // namespace cloud -} // namespace cartographer - -#endif // CARTOGRAPHER_INTERNAL_FRAMEWORK_COMPLETION_QUEUE_THREAD_H_ diff --git a/cartographer/cloud/internal/framework/event_queue_thread.cc b/cartographer/cloud/internal/framework/event_queue_thread.cc deleted file mode 100644 index d259b06..0000000 --- a/cartographer/cloud/internal/framework/event_queue_thread.cc +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2017 The Cartographer Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "cartographer/cloud/internal/framework/event_queue_thread.h" - -#include "cartographer/common/make_unique.h" -#include "glog/logging.h" - -namespace cartographer { -namespace cloud { -namespace framework { - -EventQueueThread::EventQueueThread() { - event_queue_ = common::make_unique(); -} - -EventQueue* EventQueueThread::event_queue() { return event_queue_.get(); } - -void EventQueueThread::Start(EventQueueRunner runner) { - CHECK(!thread_); - EventQueue* event_queue = event_queue_.get(); - thread_ = common::make_unique( - [event_queue, runner]() { runner(event_queue); }); -} - -void EventQueueThread::Shutdown() { - LOG(INFO) << "Shutting down event queue " << event_queue_.get(); - thread_->join(); -} - -} // namespace framework -} // namespace cloud -} // namespace cartographer diff --git a/cartographer/cloud/internal/framework/event_queue_thread.h b/cartographer/cloud/internal/framework/event_queue_thread.h deleted file mode 100644 index 3b57601..0000000 --- a/cartographer/cloud/internal/framework/event_queue_thread.h +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2017 The Cartographer Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_EVENT_QUEUE_THREAD_H -#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_EVENT_QUEUE_THREAD_H - -#include -#include - -#include "cartographer/cloud/internal/framework/rpc.h" -#include "cartographer/common/blocking_queue.h" - -namespace cartographer { -namespace cloud { -namespace framework { - -class EventQueueThread { - public: - using EventQueueRunner = std::function; - - EventQueueThread(); - - EventQueue* event_queue(); - - void Start(EventQueueRunner runner); - void Shutdown(); - - private: - std::unique_ptr event_queue_; - std::unique_ptr thread_; -}; - -} // namespace framework -} // namespace cloud -} // namespace cartographer - -#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_EVENT_QUEUE_THREAD_H diff --git a/cartographer/cloud/internal/framework/execution_context.h b/cartographer/cloud/internal/framework/execution_context.h deleted file mode 100644 index f62c542..0000000 --- a/cartographer/cloud/internal/framework/execution_context.h +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2017 The Cartographer Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_EXECUTION_CONTEXT_H -#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_EXECUTION_CONTEXT_H - -#include "cartographer/common/mutex.h" -#include "glog/logging.h" - -namespace cartographer { -namespace cloud { -namespace framework { - -// Implementations of this class allow RPC handlers to share state among one -// another. Using Server::SetExecutionContext(...) a server-wide -// 'ExecutionContext' can be specified. This 'ExecutionContext' can be retrieved -// by all implementations of 'RpcHandler' by calling -// 'RpcHandler::GetContext()'. -class ExecutionContext { - public: - // Automatically locks an ExecutionContext for shared use by RPC handlers. - // This non-movable, non-copyable class is used to broker access from various - // RPC handlers to the shared 'ExecutionContext'. - template - class Synchronized { - public: - ContextType* operator->() { - return static_cast(execution_context_); - } - Synchronized(common::Mutex* lock, ExecutionContext* execution_context) - : locker_(lock), execution_context_(execution_context) {} - Synchronized(const Synchronized&) = delete; - Synchronized(Synchronized&&) = delete; - - private: - common::MutexLocker locker_; - ExecutionContext* execution_context_; - }; - ExecutionContext() = default; - virtual ~ExecutionContext() = default; - ExecutionContext(const ExecutionContext&) = delete; - ExecutionContext& operator=(const ExecutionContext&) = delete; - common::Mutex* lock() { return &lock_; } - - private: - common::Mutex lock_; -}; - -} // namespace framework -} // namespace cloud -} // namespace cartographer - -#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_EXECUTION_CONTEXT_H diff --git a/cartographer/cloud/internal/framework/proto/math_service.proto b/cartographer/cloud/internal/framework/proto/math_service.proto deleted file mode 100644 index 29f8e43..0000000 --- a/cartographer/cloud/internal/framework/proto/math_service.proto +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright 2017 The Cartographer Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; - -package cartographer.cloud.framework.proto; - -message GetSumRequest { - int32 input = 1; -} - -message GetSumResponse { - int32 output = 1; -} - -message GetSquareRequest { - int32 input = 1; -} - -message GetSquareResponse { - int32 output = 1; -} - -message GetEchoRequest { - int32 input = 1; -} - -message GetEchoResponse { - int32 output = 1; -} - -message GetSequenceRequest { - int32 input = 1; -} - -message GetSequenceResponse { - int32 output = 1; -} - -// Provides information about the gRPC server. -service Math { - rpc GetSum(stream GetSumRequest) returns (GetSumResponse); - rpc GetSquare(GetSquareRequest) returns (GetSquareResponse); - rpc GetRunningSum(stream GetSumRequest) returns (stream GetSumResponse); - rpc GetEcho(GetEchoRequest) returns (GetEchoResponse); - rpc GetSequence(GetSequenceRequest) returns (stream GetSequenceResponse); -} diff --git a/cartographer/cloud/internal/framework/retry.cc b/cartographer/cloud/internal/framework/retry.cc deleted file mode 100644 index 2bc53de..0000000 --- a/cartographer/cloud/internal/framework/retry.cc +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright 2018 The Cartographer Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include - -#include "cartographer/cloud/internal/framework/retry.h" -#include "glog/logging.h" - -namespace cartographer { -namespace cloud { -namespace framework { - -RetryStrategy CreateRetryStrategy(RetryIndicator retry_indicator, - RetryDelayCalculator retry_delay_calculator) { - return [retry_indicator, retry_delay_calculator](int failed_attempts) { - if (!retry_indicator(failed_attempts)) { - return optional(); - } - return optional(retry_delay_calculator(failed_attempts)); - }; -} - -RetryIndicator CreateLimitedRetryIndicator(int max_attempts) { - return [max_attempts](int failed_attempts) { - return failed_attempts < max_attempts; - }; -} - -RetryDelayCalculator CreateBackoffDelayCalculator(Duration min_delay, - float backoff_factor) { - return [min_delay, backoff_factor](int failed_attempts) -> Duration { - CHECK_GE(failed_attempts, 0); - using common::FromSeconds; - using common::ToSeconds; - return FromSeconds(std::pow(backoff_factor, failed_attempts - 1) * - ToSeconds(min_delay)); - }; -} - -RetryStrategy CreateLimitedBackoffStrategy(Duration min_delay, - float backoff_factor, - int max_attempts) { - return CreateRetryStrategy( - CreateLimitedRetryIndicator(max_attempts), - CreateBackoffDelayCalculator(min_delay, backoff_factor)); -} - -bool RetryWithStrategy(RetryStrategy retry_strategy, std::function op, - std::function reset) { - optional delay; - int failed_attemps = 0; - for (;;) { - if (op()) { - return true; - } - if (!retry_strategy) { - return false; - } - delay = retry_strategy(++failed_attemps); - if (!delay.has_value()) { - break; - } - LOG(INFO) << "Retrying after " - << std::chrono::duration_cast( - delay.value()) - .count() - << " milliseconds."; - std::this_thread::sleep_for(delay.value()); - if (reset) { - reset(); - } - } - return false; -} - -} // namespace framework -} // namespace cloud -} // namespace cartographer diff --git a/cartographer/cloud/internal/framework/retry.h b/cartographer/cloud/internal/framework/retry.h deleted file mode 100644 index 5aaf75a..0000000 --- a/cartographer/cloud/internal/framework/retry.h +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2018 The Cartographer Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RETRY_H -#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RETRY_H - -#include "cartographer/common/optional.h" -#include "cartographer/common/time.h" -#include "grpc++/grpc++.h" - -namespace cartographer { -namespace cloud { -namespace framework { - -using common::Duration; -using common::optional; - -using RetryStrategy = - std::function(int /* failed_attempts */)>; -using RetryIndicator = std::function; -using RetryDelayCalculator = std::function; - -RetryStrategy CreateRetryStrategy(RetryIndicator retry_indicator, - RetryDelayCalculator retry_delay_calculator); - -RetryIndicator CreateLimitedRetryIndicator(int max_attempts); -RetryDelayCalculator CreateBackoffDelayCalculator(Duration min_delay, - float backoff_factor); -RetryStrategy CreateLimitedBackoffStrategy(Duration min_delay, - float backoff_factor, - int max_attempts); - -bool RetryWithStrategy(RetryStrategy retry_strategy, std::function op, - std::function reset = nullptr); - -} // namespace framework -} // namespace cloud -} // namespace cartographer - -#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RETRY_H diff --git a/cartographer/cloud/internal/framework/rpc.cc b/cartographer/cloud/internal/framework/rpc.cc deleted file mode 100644 index a2c892c..0000000 --- a/cartographer/cloud/internal/framework/rpc.cc +++ /dev/null @@ -1,383 +0,0 @@ -/* - * Copyright 2017 The Cartographer Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "cartographer/cloud/internal/framework/rpc.h" -#include "cartographer/cloud/internal/framework/service.h" - -#include "cartographer/common/make_unique.h" -#include "glog/logging.h" - -namespace cartographer { -namespace cloud { -namespace framework { -namespace { - -// Finishes the gRPC for non-streaming response RPCs, i.e. NORMAL_RPC and -// CLIENT_STREAMING. If no 'msg' is passed, we signal an error to the client as -// the server is not honoring the gRPC call signature. -template -void SendUnaryFinish(ReaderWriter* reader_writer, ::grpc::Status status, - const google::protobuf::Message* msg, - Rpc::EventBase* rpc_event) { - if (msg) { - reader_writer->Finish(*msg, status, rpc_event); - } else { - reader_writer->FinishWithError(status, rpc_event); - } -} - -} // namespace - -void Rpc::CompletionQueueRpcEvent::Handle() { - pending = false; - rpc_ptr->service()->HandleEvent(event, rpc_ptr, ok); -} - -void Rpc::InternalRpcEvent::Handle() { - if (auto rpc_shared = rpc.lock()) { - rpc_shared->service()->HandleEvent(event, rpc_shared.get(), true); - } else { - LOG(WARNING) << "Ignoring stale event."; - } -} - -Rpc::Rpc(int method_index, - ::grpc::ServerCompletionQueue* server_completion_queue, - EventQueue* event_queue, ExecutionContext* execution_context, - const RpcHandlerInfo& rpc_handler_info, Service* service, - WeakPtrFactory weak_ptr_factory) - : method_index_(method_index), - server_completion_queue_(server_completion_queue), - event_queue_(event_queue), - execution_context_(execution_context), - rpc_handler_info_(rpc_handler_info), - service_(service), - weak_ptr_factory_(weak_ptr_factory), - new_connection_event_(Event::NEW_CONNECTION, this), - read_event_(Event::READ, this), - write_event_(Event::WRITE, this), - finish_event_(Event::FINISH, this), - done_event_(Event::DONE, this), - handler_(rpc_handler_info_.rpc_handler_factory(this, execution_context)) { - InitializeReadersAndWriters(rpc_handler_info_.rpc_type); - - // Initialize the prototypical request and response messages. - request_.reset(::google::protobuf::MessageFactory::generated_factory() - ->GetPrototype(rpc_handler_info_.request_descriptor) - ->New()); - response_.reset(::google::protobuf::MessageFactory::generated_factory() - ->GetPrototype(rpc_handler_info_.response_descriptor) - ->New()); -} - -std::unique_ptr Rpc::Clone() { - return common::make_unique( - method_index_, server_completion_queue_, event_queue_, execution_context_, - rpc_handler_info_, service_, weak_ptr_factory_); -} - -void Rpc::OnRequest() { handler_->OnRequestInternal(request_.get()); } - -void Rpc::OnReadsDone() { handler_->OnReadsDone(); } - -void Rpc::OnFinish() { handler_->OnFinish(); } - -void Rpc::RequestNextMethodInvocation() { - // Ask gRPC to notify us when the connection terminates. - SetRpcEventState(Event::DONE, true); - // TODO(gaschler): Asan reports direct leak of this new from both calls - // StartServing and HandleNewConnection. - server_context_.AsyncNotifyWhenDone(GetRpcEvent(Event::DONE)); - - // Make sure after terminating the connection, gRPC notifies us with this - // event. - SetRpcEventState(Event::NEW_CONNECTION, true); - switch (rpc_handler_info_.rpc_type) { - case ::grpc::internal::RpcMethod::BIDI_STREAMING: - service_->RequestAsyncBidiStreaming( - method_index_, &server_context_, streaming_interface(), - server_completion_queue_, server_completion_queue_, - GetRpcEvent(Event::NEW_CONNECTION)); - break; - case ::grpc::internal::RpcMethod::CLIENT_STREAMING: - service_->RequestAsyncClientStreaming( - method_index_, &server_context_, streaming_interface(), - server_completion_queue_, server_completion_queue_, - GetRpcEvent(Event::NEW_CONNECTION)); - break; - case ::grpc::internal::RpcMethod::NORMAL_RPC: - service_->RequestAsyncUnary( - method_index_, &server_context_, request_.get(), - streaming_interface(), server_completion_queue_, - server_completion_queue_, GetRpcEvent(Event::NEW_CONNECTION)); - break; - case ::grpc::internal::RpcMethod::SERVER_STREAMING: - service_->RequestAsyncServerStreaming( - method_index_, &server_context_, request_.get(), - streaming_interface(), server_completion_queue_, - server_completion_queue_, GetRpcEvent(Event::NEW_CONNECTION)); - break; - } -} - -void Rpc::RequestStreamingReadIfNeeded() { - // For request-streaming RPCs ask the client to start sending requests. - switch (rpc_handler_info_.rpc_type) { - case ::grpc::internal::RpcMethod::BIDI_STREAMING: - case ::grpc::internal::RpcMethod::CLIENT_STREAMING: - SetRpcEventState(Event::READ, true); - async_reader_interface()->Read(request_.get(), GetRpcEvent(Event::READ)); - break; - case ::grpc::internal::RpcMethod::NORMAL_RPC: - case ::grpc::internal::RpcMethod::SERVER_STREAMING: - // For NORMAL_RPC and SERVER_STREAMING we don't need to queue an event, - // since gRPC automatically issues a READ request and places the request - // into the 'Message' we provided to 'RequestAsyncUnary' above. - OnRequest(); - OnReadsDone(); - break; - } -} - -void Rpc::Write(std::unique_ptr<::google::protobuf::Message> message) { - EnqueueMessage(SendItem{std::move(message), ::grpc::Status::OK}); - event_queue_->Push(UniqueEventPtr( - new InternalRpcEvent(Event::WRITE_NEEDED, weak_ptr_factory_(this)))); -} - -void Rpc::Finish(::grpc::Status status) { - EnqueueMessage(SendItem{nullptr /* message */, status}); - event_queue_->Push(UniqueEventPtr( - new InternalRpcEvent(Event::WRITE_NEEDED, weak_ptr_factory_(this)))); -} - -void Rpc::HandleSendQueue() { - SendItem send_item; - { - common::MutexLocker locker(&send_queue_lock_); - if (send_queue_.empty() || IsRpcEventPending(Event::WRITE) || - IsRpcEventPending(Event::FINISH)) { - return; - } - - send_item = std::move(send_queue_.front()); - send_queue_.pop(); - } - if (!send_item.msg || - rpc_handler_info_.rpc_type == ::grpc::internal::RpcMethod::NORMAL_RPC || - rpc_handler_info_.rpc_type == - ::grpc::internal::RpcMethod::CLIENT_STREAMING) { - PerformFinish(std::move(send_item.msg), send_item.status); - return; - } - PerformWrite(std::move(send_item.msg), send_item.status); -} - -::grpc::internal::ServerAsyncStreamingInterface* Rpc::streaming_interface() { - switch (rpc_handler_info_.rpc_type) { - case ::grpc::internal::RpcMethod::BIDI_STREAMING: - return server_async_reader_writer_.get(); - case ::grpc::internal::RpcMethod::CLIENT_STREAMING: - return server_async_reader_.get(); - case ::grpc::internal::RpcMethod::NORMAL_RPC: - return server_async_response_writer_.get(); - case ::grpc::internal::RpcMethod::SERVER_STREAMING: - return server_async_writer_.get(); - } - LOG(FATAL) << "Never reached."; -} - -::grpc::internal::AsyncReaderInterface<::google::protobuf::Message>* -Rpc::async_reader_interface() { - switch (rpc_handler_info_.rpc_type) { - case ::grpc::internal::RpcMethod::BIDI_STREAMING: - return server_async_reader_writer_.get(); - case ::grpc::internal::RpcMethod::CLIENT_STREAMING: - return server_async_reader_.get(); - case ::grpc::internal::RpcMethod::NORMAL_RPC: - LOG(FATAL) << "For NORMAL_RPC no streaming reader interface exists."; - case ::grpc::internal::RpcMethod::SERVER_STREAMING: - LOG(FATAL) - << "For SERVER_STREAMING no streaming reader interface exists."; - } - LOG(FATAL) << "Never reached."; -} - -::grpc::internal::AsyncWriterInterface<::google::protobuf::Message>* -Rpc::async_writer_interface() { - switch (rpc_handler_info_.rpc_type) { - case ::grpc::internal::RpcMethod::BIDI_STREAMING: - return server_async_reader_writer_.get(); - case ::grpc::internal::RpcMethod::CLIENT_STREAMING: - case ::grpc::internal::RpcMethod::NORMAL_RPC: - LOG(FATAL) << "For NORMAL_RPC and CLIENT_STREAMING no streaming writer " - "interface exists."; - break; - case ::grpc::internal::RpcMethod::SERVER_STREAMING: - return server_async_writer_.get(); - } - LOG(FATAL) << "Never reached."; -} - -Rpc::CompletionQueueRpcEvent* Rpc::GetRpcEvent(Event event) { - switch (event) { - case Event::NEW_CONNECTION: - return &new_connection_event_; - case Event::READ: - return &read_event_; - case Event::WRITE_NEEDED: - LOG(FATAL) << "Rpc does not store Event::WRITE_NEEDED."; - break; - case Event::WRITE: - return &write_event_; - case Event::FINISH: - return &finish_event_; - case Event::DONE: - return &done_event_; - } - LOG(FATAL) << "Never reached."; -} - -bool* Rpc::GetRpcEventState(Event event) { - return &GetRpcEvent(event)->pending; -} - -void Rpc::EnqueueMessage(SendItem&& send_item) { - common::MutexLocker locker(&send_queue_lock_); - send_queue_.emplace(std::move(send_item)); -} - -void Rpc::PerformFinish(std::unique_ptr<::google::protobuf::Message> message, - ::grpc::Status status) { - SetRpcEventState(Event::FINISH, true); - switch (rpc_handler_info_.rpc_type) { - case ::grpc::internal::RpcMethod::BIDI_STREAMING: - CHECK(!message); - server_async_reader_writer_->Finish(status, GetRpcEvent(Event::FINISH)); - break; - case ::grpc::internal::RpcMethod::CLIENT_STREAMING: - response_ = std::move(message); - SendUnaryFinish(server_async_reader_.get(), status, response_.get(), - GetRpcEvent(Event::FINISH)); - break; - case ::grpc::internal::RpcMethod::NORMAL_RPC: - response_ = std::move(message); - SendUnaryFinish(server_async_response_writer_.get(), status, - response_.get(), GetRpcEvent(Event::FINISH)); - break; - case ::grpc::internal::RpcMethod::SERVER_STREAMING: - CHECK(!message); - server_async_writer_->Finish(status, GetRpcEvent(Event::FINISH)); - break; - } -} - -void Rpc::PerformWrite(std::unique_ptr<::google::protobuf::Message> message, - ::grpc::Status status) { - CHECK(message) << "PerformWrite must be called with a non-null message"; - CHECK_NE(rpc_handler_info_.rpc_type, ::grpc::internal::RpcMethod::NORMAL_RPC); - CHECK_NE(rpc_handler_info_.rpc_type, - ::grpc::internal::RpcMethod::CLIENT_STREAMING); - SetRpcEventState(Event::WRITE, true); - response_ = std::move(message); - async_writer_interface()->Write(*response_, GetRpcEvent(Event::WRITE)); -} - -void Rpc::SetRpcEventState(Event event, bool pending) { - // TODO(gaschler): Since the only usage is setting this true at creation, - // consider removing this method. - *GetRpcEventState(event) = pending; -} - -bool Rpc::IsRpcEventPending(Event event) { return *GetRpcEventState(event); } - -bool Rpc::IsAnyEventPending() { - return IsRpcEventPending(Rpc::Event::DONE) || - IsRpcEventPending(Rpc::Event::READ) || - IsRpcEventPending(Rpc::Event::WRITE) || - IsRpcEventPending(Rpc::Event::FINISH); -} - -std::weak_ptr Rpc::GetWeakPtr() { return weak_ptr_factory_(this); } - -ActiveRpcs::ActiveRpcs() : lock_() {} - -void Rpc::InitializeReadersAndWriters( - ::grpc::internal::RpcMethod::RpcType rpc_type) { - switch (rpc_type) { - case ::grpc::internal::RpcMethod::BIDI_STREAMING: - server_async_reader_writer_ = - common::make_unique<::grpc::ServerAsyncReaderWriter< - google::protobuf::Message, google::protobuf::Message>>( - &server_context_); - break; - case ::grpc::internal::RpcMethod::CLIENT_STREAMING: - server_async_reader_ = common::make_unique<::grpc::ServerAsyncReader< - google::protobuf::Message, google::protobuf::Message>>( - &server_context_); - break; - case ::grpc::internal::RpcMethod::NORMAL_RPC: - server_async_response_writer_ = common::make_unique< - ::grpc::ServerAsyncResponseWriter>( - &server_context_); - break; - case ::grpc::internal::RpcMethod::SERVER_STREAMING: - server_async_writer_ = common::make_unique< - ::grpc::ServerAsyncWriter>( - &server_context_); - break; - } -} - -ActiveRpcs::~ActiveRpcs() { - common::MutexLocker locker(&lock_); - if (!rpcs_.empty()) { - LOG(FATAL) << "RPCs still in flight!"; - } -} - -std::shared_ptr ActiveRpcs::Add(std::unique_ptr rpc) { - common::MutexLocker locker(&lock_); - std::shared_ptr shared_ptr_rpc = std::move(rpc); - const auto result = rpcs_.emplace(shared_ptr_rpc.get(), shared_ptr_rpc); - CHECK(result.second) << "RPC already active."; - return shared_ptr_rpc; -} - -bool ActiveRpcs::Remove(Rpc* rpc) { - common::MutexLocker locker(&lock_); - auto it = rpcs_.find(rpc); - if (it != rpcs_.end()) { - rpcs_.erase(it); - return true; - } - return false; -} - -Rpc::WeakPtrFactory ActiveRpcs::GetWeakPtrFactory() { - return [this](Rpc* rpc) { return GetWeakPtr(rpc); }; -} - -std::weak_ptr ActiveRpcs::GetWeakPtr(Rpc* rpc) { - common::MutexLocker locker(&lock_); - auto it = rpcs_.find(rpc); - CHECK(it != rpcs_.end()); - return it->second; -} - -} // namespace framework -} // namespace cloud -} // namespace cartographer diff --git a/cartographer/cloud/internal/framework/rpc.h b/cartographer/cloud/internal/framework/rpc.h deleted file mode 100644 index 2184136..0000000 --- a/cartographer/cloud/internal/framework/rpc.h +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Copyright 2017 The Cartographer Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RPC_H -#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RPC_H - -#include -#include -#include - -#include "cartographer/cloud/internal/framework/execution_context.h" -#include "cartographer/cloud/internal/framework/rpc_handler_interface.h" -#include "cartographer/common/blocking_queue.h" -#include "cartographer/common/mutex.h" -#include "google/protobuf/message.h" -#include "grpc++/grpc++.h" -#include "grpc++/impl/codegen/async_stream.h" -#include "grpc++/impl/codegen/async_unary_call.h" -#include "grpc++/impl/codegen/proto_utils.h" -#include "grpc++/impl/codegen/service_type.h" - -namespace cartographer { -namespace cloud { -namespace framework { - -class Service; -// TODO(cschuet): Add a unittest that tests the logic of this class. -class Rpc { - public: - using WeakPtrFactory = std::function(Rpc*)>; - enum class Event { - NEW_CONNECTION = 0, - READ, - WRITE_NEEDED, - WRITE, - FINISH, - DONE - }; - - struct EventBase { - explicit EventBase(Event event) : event(event) {} - virtual ~EventBase(){}; - virtual void Handle() = 0; - - const Event event; - }; - - class EventDeleter { - public: - enum Action { DELETE = 0, DO_NOT_DELETE }; - - // The default action 'DELETE' is used implicitly, for instance for a - // new UniqueEventPtr or a UniqueEventPtr that is created by - // 'return nullptr'. - EventDeleter() : action_(DELETE) {} - explicit EventDeleter(Action action) : action_(action) {} - void operator()(EventBase* e) { - if (e != nullptr && action_ == DELETE) { - delete e; - } - } - - private: - Action action_; - }; - - using UniqueEventPtr = std::unique_ptr; - using EventQueue = common::BlockingQueue; - - // Flows through gRPC's CompletionQueue and then our EventQueue. - struct CompletionQueueRpcEvent : public EventBase { - CompletionQueueRpcEvent(Event event, Rpc* rpc) - : EventBase(event), rpc_ptr(rpc), ok(false), pending(false) {} - void PushToEventQueue() { - rpc_ptr->event_queue()->Push( - UniqueEventPtr(this, EventDeleter(EventDeleter::DO_NOT_DELETE))); - } - void Handle() override; - - Rpc* rpc_ptr; - bool ok; - bool pending; - }; - - // Flows only through our EventQueue. - struct InternalRpcEvent : public EventBase { - InternalRpcEvent(Event event, std::weak_ptr rpc) - : EventBase(event), rpc(rpc) {} - void Handle() override; - - std::weak_ptr rpc; - }; - - Rpc(int method_index, ::grpc::ServerCompletionQueue* server_completion_queue, - EventQueue* event_queue, ExecutionContext* execution_context, - const RpcHandlerInfo& rpc_handler_info, Service* service, - WeakPtrFactory weak_ptr_factory); - std::unique_ptr Clone(); - void OnRequest(); - void OnReadsDone(); - void OnFinish(); - void RequestNextMethodInvocation(); - void RequestStreamingReadIfNeeded(); - void HandleSendQueue(); - void Write(std::unique_ptr<::google::protobuf::Message> message); - void Finish(::grpc::Status status); - Service* service() { return service_; } - bool IsRpcEventPending(Event event); - bool IsAnyEventPending(); - void SetEventQueue(EventQueue* event_queue) { event_queue_ = event_queue; } - EventQueue* event_queue() { return event_queue_; } - std::weak_ptr GetWeakPtr(); - - private: - struct SendItem { - std::unique_ptr msg; - ::grpc::Status status; - }; - - Rpc(const Rpc&) = delete; - Rpc& operator=(const Rpc&) = delete; - void InitializeReadersAndWriters( - ::grpc::internal::RpcMethod::RpcType rpc_type); - CompletionQueueRpcEvent* GetRpcEvent(Event event); - bool* GetRpcEventState(Event event); - void SetRpcEventState(Event event, bool pending); - void EnqueueMessage(SendItem&& send_item); - void PerformFinish(std::unique_ptr<::google::protobuf::Message> message, - ::grpc::Status status); - void PerformWrite(std::unique_ptr<::google::protobuf::Message> message, - ::grpc::Status status); - - ::grpc::internal::AsyncReaderInterface<::google::protobuf::Message>* - async_reader_interface(); - ::grpc::internal::AsyncWriterInterface<::google::protobuf::Message>* - async_writer_interface(); - - ::grpc::internal::ServerAsyncStreamingInterface* streaming_interface(); - - int method_index_; - ::grpc::ServerCompletionQueue* server_completion_queue_; - EventQueue* event_queue_; - ExecutionContext* execution_context_; - RpcHandlerInfo rpc_handler_info_; - Service* service_; - WeakPtrFactory weak_ptr_factory_; - ::grpc::ServerContext server_context_; - - CompletionQueueRpcEvent new_connection_event_; - CompletionQueueRpcEvent read_event_; - CompletionQueueRpcEvent write_event_; - CompletionQueueRpcEvent finish_event_; - CompletionQueueRpcEvent done_event_; - - std::unique_ptr request_; - std::unique_ptr response_; - - std::unique_ptr handler_; - - std::unique_ptr<::grpc::ServerAsyncResponseWriter> - server_async_response_writer_; - std::unique_ptr<::grpc::ServerAsyncReader> - server_async_reader_; - std::unique_ptr<::grpc::ServerAsyncReaderWriter> - server_async_reader_writer_; - std::unique_ptr<::grpc::ServerAsyncWriter> - server_async_writer_; - - common::Mutex send_queue_lock_; - std::queue send_queue_; -}; - -using EventQueue = Rpc::EventQueue; - -// This class keeps track of all in-flight RPCs for a 'Service'. Make sure that -// all RPCs have been terminated and removed from this object before it goes out -// of scope. -class ActiveRpcs { - public: - ActiveRpcs(); - ~ActiveRpcs() EXCLUDES(lock_); - - std::shared_ptr Add(std::unique_ptr rpc) EXCLUDES(lock_); - bool Remove(Rpc* rpc) EXCLUDES(lock_); - Rpc::WeakPtrFactory GetWeakPtrFactory(); - - private: - std::weak_ptr GetWeakPtr(Rpc* rpc); - - common::Mutex lock_; - std::map> rpcs_; -}; - -} // namespace framework -} // namespace cloud -} // namespace cartographer - -#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RPC_H diff --git a/cartographer/cloud/internal/framework/rpc_handler.h b/cartographer/cloud/internal/framework/rpc_handler.h deleted file mode 100644 index 4024f61..0000000 --- a/cartographer/cloud/internal/framework/rpc_handler.h +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright 2017 The Cartographer Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RPC_HANDLER_H -#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RPC_HANDLER_H - -#include "cartographer/cloud/internal/framework/execution_context.h" -#include "cartographer/cloud/internal/framework/rpc.h" -#include "cartographer/cloud/internal/framework/rpc_handler_interface.h" -#include "cartographer/cloud/internal/framework/type_traits.h" -#include "glog/logging.h" -#include "google/protobuf/message.h" -#include "grpc++/grpc++.h" - -namespace cartographer { -namespace cloud { -namespace framework { - -template -class RpcHandler : public RpcHandlerInterface { - public: - using IncomingType = Incoming; - using OutgoingType = Outgoing; - using RequestType = StripStream; - using ResponseType = StripStream; - - class Writer { - public: - explicit Writer(std::weak_ptr rpc) : rpc_(std::move(rpc)) {} - bool Write(std::unique_ptr message) const { - if (auto rpc = rpc_.lock()) { - rpc->Write(std::move(message)); - return true; - } - return false; - } - bool WritesDone() const { - if (auto rpc = rpc_.lock()) { - rpc->Finish(::grpc::Status::OK); - return true; - } - return false; - } - - private: - const std::weak_ptr rpc_; - }; - void SetExecutionContext(ExecutionContext* execution_context) override { - execution_context_ = execution_context; - } - void SetRpc(Rpc* rpc) override { rpc_ = rpc; } - void OnRequestInternal(const ::google::protobuf::Message* request) override { - DCHECK(dynamic_cast(request)); - OnRequest(static_cast(*request)); - } - virtual void OnRequest(const RequestType& request) = 0; - void Finish(::grpc::Status status) { rpc_->Finish(status); } - void Send(std::unique_ptr response) { - rpc_->Write(std::move(response)); - } - template - ExecutionContext::Synchronized GetContext() { - return {execution_context_->lock(), execution_context_}; - } - template - T* GetUnsynchronizedContext() { - return dynamic_cast(execution_context_); - } - Writer GetWriter() { return Writer(rpc_->GetWeakPtr()); } - - private: - Rpc* rpc_; - ExecutionContext* execution_context_; -}; - -} // namespace framework -} // namespace cloud -} // namespace cartographer - -#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RPC_HANDLER_H diff --git a/cartographer/cloud/internal/framework/rpc_handler_interface.h b/cartographer/cloud/internal/framework/rpc_handler_interface.h deleted file mode 100644 index 685cc5a..0000000 --- a/cartographer/cloud/internal/framework/rpc_handler_interface.h +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2017 The Cartographer Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RPC_HANDLER_INTERFACE_H_ -#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RPC_HANDLER_INTERFACE_H_ - -#include "cartographer/cloud/internal/framework/execution_context.h" -#include "cartographer/common/make_unique.h" -#include "google/protobuf/message.h" -#include "grpc++/grpc++.h" - -namespace cartographer { -namespace cloud { -namespace framework { - -class Rpc; -class RpcHandlerInterface { - public: - virtual ~RpcHandlerInterface() = default; - // Returns the fully qualified name of the gRPC method this handler is - // implementing. The fully qualified name has the structure - // '/<>/<>', where the service name is the - // fully qualified proto package name of the service and method name the name - // of the method as defined in the service definition of the proto. - virtual std::string method_name() const = 0; - virtual void SetExecutionContext(ExecutionContext* execution_context) = 0; - virtual void SetRpc(Rpc* rpc) = 0; - virtual void OnRequestInternal( - const ::google::protobuf::Message* request) = 0; - virtual void OnReadsDone(){}; - virtual void OnFinish(){}; - template - static std::unique_ptr Instantiate() { - return common::make_unique(); - } -}; - -using RpcHandlerFactory = std::function( - Rpc*, ExecutionContext*)>; - -struct RpcHandlerInfo { - const google::protobuf::Descriptor* request_descriptor; - const google::protobuf::Descriptor* response_descriptor; - const RpcHandlerFactory rpc_handler_factory; - const ::grpc::internal::RpcMethod::RpcType rpc_type; - const std::string fully_qualified_name; -}; - -} // namespace framework -} // namespace cloud -} // namespace cartographer - -#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_RPC_HANDLER_INTERFACE_H_ diff --git a/cartographer/cloud/internal/framework/server.cc b/cartographer/cloud/internal/framework/server.cc deleted file mode 100644 index c54cd41..0000000 --- a/cartographer/cloud/internal/framework/server.cc +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Copyright 2017 The Cartographer Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "cartographer/cloud/internal/framework/server.h" - -#include "glog/logging.h" - -namespace cartographer { -namespace cloud { -namespace framework { -namespace { - -const common::Duration kPopEventTimeout = common::FromMilliseconds(100); - -} // namespace - -void Server::Builder::SetNumGrpcThreads(const size_t num_grpc_threads) { - options_.num_grpc_threads = num_grpc_threads; -} - -void Server::Builder::SetNumEventThreads(const std::size_t num_event_threads) { - options_.num_event_threads = num_event_threads; -} - -void Server::Builder::SetServerAddress(const std::string& server_address) { - options_.server_address = server_address; -} - -std::tuple Server::Builder::ParseMethodFullName( - const std::string& method_full_name) { - CHECK(method_full_name.at(0) == '/') << "Invalid method name."; - std::stringstream stream(method_full_name.substr(1)); - std::string service_full_name; - std::getline(stream, service_full_name, '/'); - std::string method_name; - std::getline(stream, method_name, '/'); - CHECK(!service_full_name.empty() && !method_name.empty()); - return std::make_tuple(service_full_name, method_name); -} - -std::unique_ptr Server::Builder::Build() { - std::unique_ptr server(new Server(options_)); - for (const auto& service_handlers : rpc_handlers_) { - server->AddService(service_handlers.first, service_handlers.second); - } - return server; -} - -Server::Server(const Options& options) : options_(options) { - server_builder_.AddListeningPort(options_.server_address, - ::grpc::InsecureServerCredentials()); - - // Set up event queue threads. - event_queue_threads_ = - std::vector(options_.num_event_threads); - - // Set up completion queues threads. - for (size_t i = 0; i < options_.num_grpc_threads; ++i) { - completion_queue_threads_.emplace_back( - server_builder_.AddCompletionQueue()); - } -} - -void Server::AddService( - const std::string& service_name, - const std::map& rpc_handler_infos) { - // Instantiate and register service. - const auto result = services_.emplace( - std::piecewise_construct, std::make_tuple(service_name), - std::make_tuple(service_name, rpc_handler_infos, - [this]() { return SelectNextEventQueueRoundRobin(); })); - CHECK(result.second) << "A service named " << service_name - << " already exists."; - server_builder_.RegisterService(&result.first->second); -} - -void Server::RunCompletionQueue( - ::grpc::ServerCompletionQueue* completion_queue) { - bool ok; - void* tag; - while (completion_queue->Next(&tag, &ok)) { - auto* rpc_event = static_cast(tag); - rpc_event->ok = ok; - rpc_event->PushToEventQueue(); - } -} - -EventQueue* Server::SelectNextEventQueueRoundRobin() { - common::MutexLocker locker(¤t_event_queue_id_lock_); - current_event_queue_id_ = - (current_event_queue_id_ + 1) % options_.num_event_threads; - return event_queue_threads_.at(current_event_queue_id_).event_queue(); -} - -void Server::RunEventQueue(EventQueue* event_queue) { - while (!shutting_down_) { - Rpc::UniqueEventPtr rpc_event = - event_queue->PopWithTimeout(kPopEventTimeout); - if (rpc_event) { - rpc_event->Handle(); - } - } - - // Finish processing the rest of the items. - while (Rpc::UniqueEventPtr rpc_event = - event_queue->PopWithTimeout(kPopEventTimeout)) { - rpc_event->Handle(); - } -} - -void Server::Start() { - // Start the gRPC server process. - server_ = server_builder_.BuildAndStart(); - - // Start serving all services on all completion queues. - for (auto& service : services_) { - service.second.StartServing(completion_queue_threads_, - execution_context_.get()); - } - - // Start threads to process all event queues. - for (auto& event_queue_thread : event_queue_threads_) { - event_queue_thread.Start( - [this](EventQueue* event_queue) { RunEventQueue(event_queue); }); - } - - // Start threads to process all completion queues. - for (auto& completion_queue_threads : completion_queue_threads_) { - completion_queue_threads.Start( - [this](::grpc::ServerCompletionQueue* completion_queue) { - RunCompletionQueue(completion_queue); - }); - } -} - -void Server::WaitForShutdown() { - if (!server_) { - return; - } - - server_->Wait(); -} - -void Server::Shutdown() { - LOG(INFO) << "Shutting down server."; - shutting_down_ = true; - - // Tell the services to stop serving RPCs. - for (auto& service : services_) { - service.second.StopServing(); - } - - // Shut down the gRPC server waiting for RPCs to finish until the hard - // deadline; then force a shutdown. - server_->Shutdown(); - - // Shut down the server completion queues and wait for the processing threads - // to join. - for (auto& completion_queue_threads : completion_queue_threads_) { - completion_queue_threads.Shutdown(); - } - - for (auto& event_queue_thread : event_queue_threads_) { - event_queue_thread.Shutdown(); - } - - LOG(INFO) << "Shutdown complete."; -} - -void Server::SetExecutionContext( - std::unique_ptr execution_context) { - // After the server has been started the 'ExecutionHandle' cannot be changed - // anymore. - CHECK(!server_); - execution_context_ = std::move(execution_context); -} - -} // namespace framework -} // namespace cloud -} // namespace cartographer diff --git a/cartographer/cloud/internal/framework/server.h b/cartographer/cloud/internal/framework/server.h deleted file mode 100644 index e8e83c1..0000000 --- a/cartographer/cloud/internal/framework/server.h +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Copyright 2017 The Cartographer Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_SERVER_H -#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_SERVER_H - -#include -#include -#include -#include -#include - -#include "cartographer/cloud/internal/framework/completion_queue_thread.h" -#include "cartographer/cloud/internal/framework/event_queue_thread.h" -#include "cartographer/cloud/internal/framework/execution_context.h" -#include "cartographer/cloud/internal/framework/rpc_handler.h" -#include "cartographer/cloud/internal/framework/service.h" -#include "cartographer/common/make_unique.h" -#include "grpc++/grpc++.h" - -namespace cartographer { -namespace cloud { -namespace framework { - -class Server { - protected: - // All options that configure server behaviour such as number of threads, - // ports etc. - struct Options { - size_t num_grpc_threads; - size_t num_event_threads; - std::string server_address; - }; - - public: - // This 'Builder' is the only way to construct a 'Server'. - class Builder { - public: - Builder() = default; - - std::unique_ptr Build(); - void SetNumGrpcThreads(std::size_t num_grpc_threads); - void SetNumEventThreads(std::size_t num_event_threads); - void SetServerAddress(const std::string& server_address); - - template - void RegisterHandler() { - std::string method_full_name = - RpcHandlerInterface::Instantiate()->method_name(); - std::string service_full_name; - std::string method_name; - std::tie(service_full_name, method_name) = - ParseMethodFullName(method_full_name); - CheckHandlerCompatibility(service_full_name, method_name); - rpc_handlers_[service_full_name].emplace( - method_name, - RpcHandlerInfo{ - RpcHandlerType::RequestType::default_instance().GetDescriptor(), - RpcHandlerType::ResponseType::default_instance().GetDescriptor(), - [](Rpc* const rpc, ExecutionContext* const execution_context) { - std::unique_ptr rpc_handler = - common::make_unique(); - rpc_handler->SetRpc(rpc); - rpc_handler->SetExecutionContext(execution_context); - return rpc_handler; - }, - RpcType::value, - method_full_name}); - } - static std::tuple - ParseMethodFullName(const std::string& method_full_name); - - private: - using ServiceInfo = std::map; - - template - void CheckHandlerCompatibility(const std::string& service_full_name, - const std::string& method_name) { - const auto* pool = google::protobuf::DescriptorPool::generated_pool(); - const auto* service = pool->FindServiceByName(service_full_name); - CHECK(service) << "Unknown service " << service_full_name; - const auto* method_descriptor = service->FindMethodByName(method_name); - CHECK(method_descriptor) << "Unknown method " << method_name - << " in service " << service_full_name; - const auto* request_type = method_descriptor->input_type(); - CHECK_EQ(RpcHandlerType::RequestType::default_instance().GetDescriptor(), - request_type); - const auto* response_type = method_descriptor->output_type(); - CHECK_EQ(RpcHandlerType::ResponseType::default_instance().GetDescriptor(), - response_type); - const auto rpc_type = - RpcType::value; - switch (rpc_type) { - case ::grpc::internal::RpcMethod::NORMAL_RPC: - CHECK(!method_descriptor->client_streaming()); - CHECK(!method_descriptor->server_streaming()); - break; - case ::grpc::internal::RpcMethod::CLIENT_STREAMING: - CHECK(method_descriptor->client_streaming()); - CHECK(!method_descriptor->server_streaming()); - break; - case ::grpc::internal::RpcMethod::SERVER_STREAMING: - CHECK(!method_descriptor->client_streaming()); - CHECK(method_descriptor->server_streaming()); - break; - case ::grpc::internal::RpcMethod::BIDI_STREAMING: - CHECK(method_descriptor->client_streaming()); - CHECK(method_descriptor->server_streaming()); - break; - } - } - - Options options_; - std::map rpc_handlers_; - }; - friend class Builder; - virtual ~Server() = default; - - // Starts a server starts serving the registered services. - void Start(); - - // Waits for the server to shut down. Note: The server must be either shutting - // down or some other thread must call 'Shutdown()' for this function to ever - // return. - void WaitForShutdown(); - - // Shuts down the server and all of its services. - void Shutdown(); - - // Sets the server-wide context object shared between RPC handlers. - void SetExecutionContext(std::unique_ptr execution_context); - - template - ExecutionContext::Synchronized GetContext() { - return {execution_context_->lock(), execution_context_.get()}; - } - - template - T* GetUnsynchronizedContext() { - return dynamic_cast(execution_context_.get()); - } - - protected: - Server(const Options& options); - void AddService( - const std::string& service_name, - const std::map& rpc_handler_infos); - - private: - Server(const Server&) = delete; - Server& operator=(const Server&) = delete; - void RunCompletionQueue(::grpc::ServerCompletionQueue* completion_queue); - void RunEventQueue(Rpc::EventQueue* event_queue); - Rpc::EventQueue* SelectNextEventQueueRoundRobin(); - - Options options_; - - bool shutting_down_ = false; - - // gRPC objects needed to build a server. - ::grpc::ServerBuilder server_builder_; - std::unique_ptr<::grpc::Server> server_; - - // Threads processing the completion queues. - std::vector completion_queue_threads_; - - // Threads processing RPC events. - std::vector event_queue_threads_; - common::Mutex current_event_queue_id_lock_; - int current_event_queue_id_ = 0; - - // Map of service names to services. - std::map services_; - - // A context object that is shared between all implementations of - // 'RpcHandler'. - std::unique_ptr execution_context_; -}; - -} // namespace framework -} // namespace cloud -} // namespace cartographer - -#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_SERVER_H diff --git a/cartographer/cloud/internal/framework/server_test.cc b/cartographer/cloud/internal/framework/server_test.cc deleted file mode 100644 index 62a736a..0000000 --- a/cartographer/cloud/internal/framework/server_test.cc +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Copyright 2017 The Cartographer Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "cartographer/cloud/internal/framework/server.h" - -#include - -#include "cartographer/cloud/internal/framework/client.h" -#include "cartographer/cloud/internal/framework/execution_context.h" -#include "cartographer/cloud/internal/framework/proto/math_service.pb.h" -#include "cartographer/cloud/internal/framework/rpc_handler.h" -#include "glog/logging.h" -#include "google/protobuf/descriptor.h" -#include "grpc++/grpc++.h" -#include "gtest/gtest.h" - -namespace cartographer { -namespace cloud { -namespace framework { -namespace { - -using EchoResponder = std::function; - -class MathServerContext : public ExecutionContext { - public: - int additional_increment() { return 10; } - std::promise echo_responder; -}; - -class GetSumHandler - : public RpcHandler, proto::GetSumResponse> { - public: - std::string method_name() const override { - return "/cartographer.cloud.framework.proto.Math/GetSum"; - } - void OnRequest(const proto::GetSumRequest& request) override { - sum_ += GetContext()->additional_increment(); - sum_ += request.input(); - } - - void OnReadsDone() override { - auto response = common::make_unique(); - response->set_output(sum_); - Send(std::move(response)); - } - - private: - int sum_ = 0; -}; - -class GetRunningSumHandler : public RpcHandler, - Stream> { - public: - std::string method_name() const override { - return "/cartographer.cloud.framework.proto.Math/GetRunningSum"; - } - void OnRequest(const proto::GetSumRequest& request) override { - sum_ += request.input(); - - // Respond twice to demonstrate bidirectional streaming. - auto response = common::make_unique(); - response->set_output(sum_); - Send(std::move(response)); - response = common::make_unique(); - response->set_output(sum_); - Send(std::move(response)); - } - - void OnReadsDone() override { Finish(::grpc::Status::OK); } - - private: - int sum_ = 0; -}; - -class GetSquareHandler - : public RpcHandler { - public: - std::string method_name() const override { - return "/cartographer.cloud.framework.proto.Math/GetSquare"; - } - void OnRequest(const proto::GetSquareRequest& request) override { - auto response = common::make_unique(); - response->set_output(request.input() * request.input()); - Send(std::move(response)); - } -}; - -class GetEchoHandler - : public RpcHandler { - public: - std::string method_name() const override { - return "/cartographer.cloud.framework.proto.Math/GetEcho"; - } - void OnRequest(const proto::GetEchoRequest& request) override { - int value = request.input(); - Writer writer = GetWriter(); - GetContext()->echo_responder.set_value( - [writer, value]() { - auto response = common::make_unique(); - response->set_output(value); - return writer.Write(std::move(response)); - }); - } -}; - -class GetSequenceHandler - : public RpcHandler> { - public: - std::string method_name() const override { - return "/cartographer.cloud.framework.proto.Math/GetSequence"; - } - void OnRequest(const proto::GetSequenceRequest& request) override { - for (int i = 0; i < request.input(); ++i) { - auto response = common::make_unique(); - response->set_output(i); - Send(std::move(response)); - } - Finish(::grpc::Status::OK); - } -}; - -// TODO(cschuet): Due to the hard-coded part these tests will become flaky when -// run in parallel. It would be nice to find a way to solve that. gRPC also -// allows to communicate over UNIX domain sockets. -const std::string kServerAddress = "localhost:50051"; -const std::size_t kNumThreads = 1; - -class ServerTest : public ::testing::Test { - protected: - void SetUp() override { - Server::Builder server_builder; - server_builder.SetServerAddress(kServerAddress); - server_builder.SetNumGrpcThreads(kNumThreads); - server_builder.SetNumEventThreads(kNumThreads); - server_builder.RegisterHandler(); - server_builder.RegisterHandler(); - server_builder.RegisterHandler(); - server_builder.RegisterHandler(); - server_builder.RegisterHandler(); - server_ = server_builder.Build(); - - client_channel_ = ::grpc::CreateChannel( - kServerAddress, ::grpc::InsecureChannelCredentials()); - } - - std::unique_ptr server_; - std::shared_ptr<::grpc::Channel> client_channel_; -}; - -TEST_F(ServerTest, StartAndStopServerTest) { - server_->Start(); - server_->Shutdown(); -} - -TEST_F(ServerTest, ProcessRpcStreamTest) { - server_->SetExecutionContext(common::make_unique()); - server_->Start(); - - Client client(client_channel_); - for (int i = 0; i < 3; ++i) { - proto::GetSumRequest request; - request.set_input(i); - EXPECT_TRUE(client.Write(request)); - } - EXPECT_TRUE(client.WritesDone()); - EXPECT_TRUE(client.Finish().ok()); - EXPECT_EQ(client.response().output(), 33); - - server_->Shutdown(); -} - -TEST_F(ServerTest, ProcessUnaryRpcTest) { - server_->Start(); - - Client client(client_channel_); - proto::GetSquareRequest request; - request.set_input(11); - EXPECT_TRUE(client.Write(request)); - EXPECT_EQ(client.response().output(), 121); - - server_->Shutdown(); -} - -TEST_F(ServerTest, ProcessBidiStreamingRpcTest) { - server_->Start(); - - Client client(client_channel_); - for (int i = 0; i < 3; ++i) { - proto::GetSumRequest request; - request.set_input(i); - EXPECT_TRUE(client.Write(request)); - } - client.WritesDone(); - proto::GetSumResponse response; - std::list expected_responses = {0, 0, 1, 1, 3, 3}; - while (client.Read(&response)) { - EXPECT_EQ(expected_responses.front(), response.output()); - expected_responses.pop_front(); - } - EXPECT_TRUE(expected_responses.empty()); - EXPECT_TRUE(client.Finish().ok()); - - server_->Shutdown(); -} - -TEST_F(ServerTest, WriteFromOtherThread) { - server_->SetExecutionContext(common::make_unique()); - server_->Start(); - - Server* server = server_.get(); - std::thread response_thread([server]() { - std::future responder_future = - server->GetContext()->echo_responder.get_future(); - responder_future.wait(); - auto responder = responder_future.get(); - CHECK(responder()); - }); - - Client client(client_channel_); - proto::GetEchoRequest request; - request.set_input(13); - EXPECT_TRUE(client.Write(request)); - response_thread.join(); - EXPECT_EQ(client.response().output(), 13); - - server_->Shutdown(); -} - -TEST_F(ServerTest, ProcessServerStreamingRpcTest) { - server_->Start(); - - Client client(client_channel_); - proto::GetSequenceRequest request; - request.set_input(12); - - client.Write(request); - proto::GetSequenceResponse response; - for (int i = 0; i < 12; ++i) { - EXPECT_TRUE(client.Read(&response)); - EXPECT_EQ(response.output(), i); - } - EXPECT_FALSE(client.Read(&response)); - EXPECT_TRUE(client.Finish().ok()); - - server_->Shutdown(); -} - -} // namespace -} // namespace framework -} // namespace cloud -} // namespace cartographer diff --git a/cartographer/cloud/internal/framework/service.cc b/cartographer/cloud/internal/framework/service.cc deleted file mode 100644 index 9305c5b..0000000 --- a/cartographer/cloud/internal/framework/service.cc +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Copyright 2017 The Cartographer Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "cartographer/cloud/internal/framework/server.h" - -#include - -#include "glog/logging.h" -#include "grpc++/impl/codegen/proto_utils.h" - -namespace cartographer { -namespace cloud { -namespace framework { - -Service::Service(const std::string& service_name, - const std::map& rpc_handler_infos, - EventQueueSelector event_queue_selector) - : rpc_handler_infos_(rpc_handler_infos), - event_queue_selector_(event_queue_selector) { - for (const auto& rpc_handler_info : rpc_handler_infos_) { - // The 'handler' below is set to 'nullptr' indicating that we want to - // handle this method asynchronously. - this->AddMethod(new ::grpc::internal::RpcServiceMethod( - rpc_handler_info.second.fully_qualified_name.c_str(), - rpc_handler_info.second.rpc_type, nullptr /* handler */)); - } -} - -void Service::StartServing( - std::vector& completion_queue_threads, - ExecutionContext* execution_context) { - int i = 0; - for (const auto& rpc_handler_info : rpc_handler_infos_) { - for (auto& completion_queue_thread : completion_queue_threads) { - std::shared_ptr rpc = active_rpcs_.Add(common::make_unique( - i, completion_queue_thread.completion_queue(), - event_queue_selector_(), execution_context, rpc_handler_info.second, - this, active_rpcs_.GetWeakPtrFactory())); - rpc->RequestNextMethodInvocation(); - } - ++i; - } -} - -void Service::StopServing() { shutting_down_ = true; } - -void Service::HandleEvent(Rpc::Event event, Rpc* rpc, bool ok) { - switch (event) { - case Rpc::Event::NEW_CONNECTION: - HandleNewConnection(rpc, ok); - break; - case Rpc::Event::READ: - HandleRead(rpc, ok); - break; - case Rpc::Event::WRITE_NEEDED: - case Rpc::Event::WRITE: - HandleWrite(rpc, ok); - break; - case Rpc::Event::FINISH: - HandleFinish(rpc, ok); - break; - case Rpc::Event::DONE: - HandleDone(rpc, ok); - break; - } -} - -void Service::HandleNewConnection(Rpc* rpc, bool ok) { - if (shutting_down_) { - if (ok) { - LOG(WARNING) << "Server shutting down. Refusing to handle new RPCs."; - } - active_rpcs_.Remove(rpc); - return; - } - - if (!ok) { - LOG(ERROR) << "Failed to establish connection for unknown reason."; - active_rpcs_.Remove(rpc); - } - - if (ok) { - // For request-streaming RPCs ask the client to start sending requests. - rpc->RequestStreamingReadIfNeeded(); - } - - // Create new active rpc to handle next connection and register it for the - // incoming connection. Assign event queue in a round-robin fashion. - std::unique_ptr new_rpc = rpc->Clone(); - new_rpc->SetEventQueue(event_queue_selector_()); - active_rpcs_.Add(std::move(new_rpc))->RequestNextMethodInvocation(); -} - -void Service::HandleRead(Rpc* rpc, bool ok) { - if (ok) { - rpc->OnRequest(); - rpc->RequestStreamingReadIfNeeded(); - return; - } - - // Reads completed. - rpc->OnReadsDone(); - - RemoveIfNotPending(rpc); -} - -void Service::HandleWrite(Rpc* rpc, bool ok) { - if (!ok) { - LOG(ERROR) << "Write failed"; - } - - // Send the next message or potentially finish the connection. - rpc->HandleSendQueue(); - - RemoveIfNotPending(rpc); -} - -void Service::HandleFinish(Rpc* rpc, bool ok) { - if (!ok) { - LOG(ERROR) << "Finish failed"; - } - - rpc->OnFinish(); - - RemoveIfNotPending(rpc); -} - -void Service::HandleDone(Rpc* rpc, bool ok) { RemoveIfNotPending(rpc); } - -void Service::RemoveIfNotPending(Rpc* rpc) { - if (!rpc->IsAnyEventPending()) { - active_rpcs_.Remove(rpc); - } -} - -} // namespace framework -} // namespace cloud -} // namespace cartographer diff --git a/cartographer/cloud/internal/framework/service.h b/cartographer/cloud/internal/framework/service.h deleted file mode 100644 index 74b3879..0000000 --- a/cartographer/cloud/internal/framework/service.h +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2017 The Cartographer Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_SERVICE_H -#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_SERVICE_H - -#include "cartographer/cloud/internal/framework/completion_queue_thread.h" -#include "cartographer/cloud/internal/framework/event_queue_thread.h" -#include "cartographer/cloud/internal/framework/execution_context.h" -#include "cartographer/cloud/internal/framework/rpc.h" -#include "cartographer/cloud/internal/framework/rpc_handler.h" -#include "grpc++/impl/codegen/service_type.h" - -namespace cartographer { -namespace cloud { -namespace framework { - -// A 'Service' represents a generic service for gRPC asynchronous methods and is -// responsible for managing the lifetime of active RPCs issued against methods -// of the service and distributing incoming gRPC events to their respective -// 'Rpc' handler objects. -class Service : public ::grpc::Service { - public: - using EventQueueSelector = std::function; - friend class Rpc; - - Service(const std::string& service_name, - const std::map& rpc_handlers, - EventQueueSelector event_queue_selector); - void StartServing(std::vector& completion_queues, - ExecutionContext* execution_context); - void HandleEvent(Rpc::Event event, Rpc* rpc, bool ok); - void StopServing(); - - private: - void HandleNewConnection(Rpc* rpc, bool ok); - void HandleRead(Rpc* rpc, bool ok); - void HandleWrite(Rpc* rpc, bool ok); - void HandleFinish(Rpc* rpc, bool ok); - void HandleDone(Rpc* rpc, bool ok); - - void RemoveIfNotPending(Rpc* rpc); - - std::map rpc_handler_infos_; - EventQueueSelector event_queue_selector_; - ActiveRpcs active_rpcs_; - bool shutting_down_ = false; -}; - -} // namespace framework -} // namespace cloud -} // namespace cartographer - -#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_SERVICE_H diff --git a/cartographer/cloud/internal/framework/testing/rpc_handler_test_server.h b/cartographer/cloud/internal/framework/testing/rpc_handler_test_server.h deleted file mode 100644 index 7ea3a12..0000000 --- a/cartographer/cloud/internal/framework/testing/rpc_handler_test_server.h +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Copyright 2018 The Cartographer Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_TESTING_RPC_HANDLER_TEST_SERVER_H_ -#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_TESTING_RPC_HANDLER_TEST_SERVER_H_ - -#include -#include - -#include "cartographer/cloud/internal/framework/client.h" -#include "cartographer/cloud/internal/framework/rpc_handler_interface.h" -#include "cartographer/cloud/internal/framework/server.h" -#include "cartographer/cloud/internal/framework/testing/rpc_handler_wrapper.h" -#include "cartographer/common/blocking_queue.h" -#include "grpc++/grpc++.h" -#include "gtest/gtest.h" - -namespace cartographer { -namespace cloud { -namespace framework { -namespace testing { - -namespace { -const std::string kServerAddress = "localhost:50051"; -} // namespace - -template -class RpcHandlerTestServer : public Server { - public: - RpcHandlerTestServer(std::unique_ptr execution_context) - : Server(Options{1, 1, kServerAddress}), - channel_(::grpc::CreateChannel(kServerAddress, - ::grpc::InsecureChannelCredentials())), - client_(channel_) { - std::string method_full_name_under_test = - RpcHandlerInterface::Instantiate()->method_name(); - std::string service_full_name; - std::string method_name; - std::tie(service_full_name, method_name) = - Server::Builder::ParseMethodFullName(method_full_name_under_test); - this->AddService( - service_full_name, - {{method_name, GetRpcHandlerInfo(method_full_name_under_test)}}); - this->SetExecutionContext(std::move(execution_context)); - this->Start(); - } - - ~RpcHandlerTestServer() { this->Shutdown(); }; - - void SendWrite(const typename RpcHandlerType::RequestType &message) { - EXPECT_TRUE(client_.Write(message)); - WaitForHandlerCompletion(RpcHandlerWrapper::ON_REQUEST); - } - - // Parses a request message from the passed string and issues the - // request against the handler, waits for the handler to complete - // processing before returning. - void SendWrite(const std::string &serialized_message) { - typename RpcHandlerType::RequestType message; - message.ParseFromString(serialized_message); - Write(message); - } - - // Sends a WRITES_DONE event to the handler, waits for the handler - // to finish processing the READS_DONE event before returning. - void SendWritesDone() { - EXPECT_TRUE(client_.WritesDone()); - WaitForHandlerCompletion(RpcHandlerWrapper::ON_READS_DONE); - } - - // Sends a FINISH event to the handler under test, waits for the - // handler to finish processing the event before returning. - void SendFinish() { - EXPECT_TRUE(client_.Finish().ok()); - WaitForHandlerCompletion(RpcHandlerWrapper::ON_FINISH); - } - - const typename RpcHandlerType::ResponseType &response() { - return client_.response(); - } - - private: - using ClientWriter = ::grpc::internal::ClientWriterFactory< - typename RpcHandlerType::RequestType>; - - void WaitForHandlerCompletion( - typename RpcHandlerWrapper::RpcHandlerEvent event) { - CHECK_EQ(rpc_handler_event_queue_.Pop(), event); - } - - RpcHandlerInfo GetRpcHandlerInfo(const std::string &method_full_name) { - ::grpc::internal::RpcMethod::RpcType rpc_type = - RpcType::value; - auto event_callback = - [this]( - typename RpcHandlerWrapper::RpcHandlerEvent event) { - rpc_handler_event_queue_.Push(event); - }; - auto handler_instantiator = [event_callback]( - Rpc *const rpc, - ExecutionContext *const execution_context) { - std::unique_ptr rpc_handler = - common::make_unique>( - event_callback); - rpc_handler->SetRpc(rpc); - rpc_handler->SetExecutionContext(execution_context); - return rpc_handler; - }; - return RpcHandlerInfo{ - RpcHandlerType::RequestType::default_instance().GetDescriptor(), - RpcHandlerType::ResponseType::default_instance().GetDescriptor(), - handler_instantiator, rpc_type, method_full_name}; - } - - std::shared_ptr<::grpc::Channel> channel_; - cloud::framework::Client client_; - common::BlockingQueue< - typename RpcHandlerWrapper::RpcHandlerEvent> - rpc_handler_event_queue_; -}; - -} // namespace testing -} // namespace framework -} // namespace cloud -} // namespace cartographer - -#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_TESTING_RPC_HANDLER_TEST_SERVER_H_ diff --git a/cartographer/cloud/internal/framework/testing/rpc_handler_wrapper.h b/cartographer/cloud/internal/framework/testing/rpc_handler_wrapper.h deleted file mode 100644 index cd4b28e..0000000 --- a/cartographer/cloud/internal/framework/testing/rpc_handler_wrapper.h +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2018 The Cartographer Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_TESTING_RPC_HANDLER_WRAPPER_H_ -#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_TESTING_RPC_HANDLER_WRAPPER_H_ - -#include - -namespace cartographer { -namespace cloud { -namespace framework { -namespace testing { - -template -class RpcHandlerWrapper : public RpcHandlerType { - public: - enum RpcHandlerEvent { ON_REQUEST, ON_READS_DONE, ON_FINISH }; - using EventCallback = std::function; - - RpcHandlerWrapper(EventCallback event_callback) - : event_callback_(event_callback) {} - - void OnRequest(const typename RpcHandlerType::RequestType &request) override { - RpcHandlerType::OnRequest(request); - event_callback_(ON_REQUEST); - } - - void OnReadsDone() override { - RpcHandlerType::OnReadsDone(); - event_callback_(ON_READS_DONE); - } - - void OnFinish() override { - RpcHandlerType::OnFinish(); - event_callback_(ON_FINISH); - } - - private: - EventCallback event_callback_; -}; - -} // namespace testing -} // namespace framework -} // namespace cloud -} // namespace cartographer - -#endif // CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_TESTING_RPC_HANDLER_WRAPPER_H_ diff --git a/cartographer/cloud/internal/framework/type_traits.h b/cartographer/cloud/internal/framework/type_traits.h deleted file mode 100644 index c1a82e1..0000000 --- a/cartographer/cloud/internal/framework/type_traits.h +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2017 The Cartographer Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_TYPE_TRAITS_H_ -#define CARTOGRAPHER_CLOUD_INTERNAL_FRAMEWORK_TYPE_TRAITS_H_ - -#include - -namespace cartographer { -namespace cloud { -namespace framework { - -template -class Stream { - using type = Request; -}; - -template