From 7d2e39af4b2645fd9e6aa28f2b87c91c372bb7f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20Sch=C3=BCtte?= Date: Mon, 29 Jan 2018 12:01:17 +0100 Subject: [PATCH] gRPC handler testing (#857) --- cartographer_grpc/framework/server.h | 11 +- .../testing/rpc_handler_test_server.h | 178 ++++++++++++++++++ .../framework/testing/rpc_handler_wrapper.h | 58 ++++++ 3 files changed, 243 insertions(+), 4 deletions(-) create mode 100644 cartographer_grpc/framework/testing/rpc_handler_test_server.h create mode 100644 cartographer_grpc/framework/testing/rpc_handler_wrapper.h diff --git a/cartographer_grpc/framework/server.h b/cartographer_grpc/framework/server.h index 26b9ffb..0dac235 100644 --- a/cartographer_grpc/framework/server.h +++ b/cartographer_grpc/framework/server.h @@ -41,7 +41,7 @@ class Server { struct Options { size_t num_grpc_threads; size_t num_event_threads; - std::string server_address = "0.0.0.0:50051"; + std::string server_address; }; public: @@ -84,6 +84,7 @@ class Server { std::map rpc_handlers_; }; friend class Builder; + virtual ~Server() = default; // Starts a server starts serving the registered services. void Start(); @@ -104,13 +105,15 @@ class Server { return {execution_context_->lock(), execution_context_.get()}; } - private: + protected: Server(const Options& options); - Server(const Server&) = delete; - Server& operator=(const Server&) = delete; void AddService( const std::string& service_name, const std::map& rpc_handler_infos); + + private: + Server(const Server&) = delete; + Server& operator=(const Server&) = delete; void RunCompletionQueue(::grpc::ServerCompletionQueue* completion_queue); void RunEventQueue(Rpc::EventQueue* event_queue); Rpc::EventQueue* SelectNextEventQueueRoundRobin(); diff --git a/cartographer_grpc/framework/testing/rpc_handler_test_server.h b/cartographer_grpc/framework/testing/rpc_handler_test_server.h new file mode 100644 index 0000000..6a8a7c4 --- /dev/null +++ b/cartographer_grpc/framework/testing/rpc_handler_test_server.h @@ -0,0 +1,178 @@ +/* + * Copyright 2018 The Cartographer Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef CARTOGRAPHER_GRPC_FRAMEWORK_TESTING_RPC_HANDLER_TEST_SERVER_H_ +#define CARTOGRAPHER_GRPC_FRAMEWORK_TESTING_RPC_HANDLER_TEST_SERVER_H_ + +#include +#include + +#include "cartographer/common/blocking_queue.h" +#include "cartographer_grpc/framework/server.h" +#include "cartographer_grpc/framework/testing/rpc_handler_wrapper.h" + +namespace cartographer_grpc { +namespace framework { +namespace testing { + +namespace { +const std::string kMethodName = "method"; +const std::string kServiceName = "service"; +const std::string kFullyQualifiedMethodName = + "/" + kServiceName + "/" + kMethodName; +const std::string kServerAddress = "localhost:50051"; +} // namespace + +template +class RpcHandlerTestServer : public Server { + public: + RpcHandlerTestServer() : Server(Options{1, 1, kServerAddress}) { + // Register the handler under test. + this->AddService(kServiceName, {{kMethodName, GetRpcHandlerInfo()}}); + // Starts the server and instantiates the handler under test. + this->Start(); + // Depending on the RPC type might already trigger a NEW_CONNECTION + // event in the handler under test. + InstantiateReadersAndWriters(); + } + + ~RpcHandlerTestServer() { this->Shutdown(); }; + + // Parses a request message from the passed string and issues the + // request against the handler, waits for the handler to complete + // processing before returning. + void SendWrite(const std::string &serialized_message) { + auto message = cartographer::common::make_unique< + typename RpcHandlerType::RequestType>(); + message->ParseFromString(serialized_message); + switch (rpc_method_.method_type()) { + case ::grpc::internal::RpcMethod::CLIENT_STREAMING: + CHECK(client_writer_->Write(*message)) << "Write failed."; + break; + case ::grpc::internal::RpcMethod::BIDI_STREAMING: + case ::grpc::internal::RpcMethod::SERVER_STREAMING: + case ::grpc::internal::RpcMethod::NORMAL_RPC: + LOG(FATAL) << "Not implemented"; + break; + } + WaitForHandlerCompletion(RpcHandlerWrapper::ON_REQUEST); + } + + // Sends a WRITES_DONE event to the handler, waits for the handler + // to finish processing the READS_DONE event before returning. + void SendWritesDone() { + auto message = cartographer::common::make_unique< + typename RpcHandlerType::RequestType>(); + switch (rpc_method_.method_type()) { + case ::grpc::internal::RpcMethod::CLIENT_STREAMING: + CHECK(client_writer_->WritesDone()) << "WritesDone failed."; + break; + case ::grpc::internal::RpcMethod::BIDI_STREAMING: + case ::grpc::internal::RpcMethod::SERVER_STREAMING: + case ::grpc::internal::RpcMethod::NORMAL_RPC: + LOG(FATAL) << "Not implemented"; + break; + } + WaitForHandlerCompletion(RpcHandlerWrapper::ON_READS_DONE); + } + + // Sends a FINISH event to the handler under test, waits for the + // handler to finish processing the event before returning. + void SendFinish() { + switch (rpc_method_.method_type()) { + case ::grpc::internal::RpcMethod::CLIENT_STREAMING: + CHECK(client_writer_->Finish()) << "Finish failed."; + break; + case ::grpc::internal::RpcMethod::BIDI_STREAMING: + case ::grpc::internal::RpcMethod::SERVER_STREAMING: + case ::grpc::internal::RpcMethod::NORMAL_RPC: + LOG(FATAL) << "Not implemented"; + break; + } + WaitForHandlerCompletion(RpcHandlerWrapper::ON_FINISH); + } + + private: + using ClientWriter = ::grpc::internal::ClientWriterFactory< + typename RpcHandlerType::RequestType>; + + void WaitForHandlerCompletion(RpcHandlerWrapper::RpcHandlerEvent event) { + CHECK_EQ(rpc_handler_event_queue_.Pop(), event); + } + + void InstantiateReadersAndWriters() { + switch (rpc_method_.method_type()) { + case ::grpc::internal::RpcMethod::CLIENT_STREAMING: + if (!response_) { + response_ = cartographer::common::make_unique< + typename RpcHandlerType::ResponseType>(); + } + if (!client_writer_) { + client_writer_ = CreateClientWriter(); + } + break; + case ::grpc::internal::RpcMethod::SERVER_STREAMING: + case ::grpc::internal::RpcMethod::NORMAL_RPC: + case ::grpc::internal::RpcMethod::BIDI_STREAMING: + LOG(FATAL) << "Not implemented"; + break; + } + } + + std::unique_ptr CreateClientWriter() { + return std::unique_ptr(ClientWriter::Create( + channel_.get(), rpc_method_, &context_, response_.get())); + } + + RpcHandlerInfo GetRpcHandlerInfo() { + ::grpc::internal::RpcMethod::RpcType rpc_type = + RpcType::value; + auto event_callback = + [this](RpcHandlerWrapper::RpcHandlerEvent event) { + rpc_handler_event_queue_.Push(event); + }; + auto handler_instantiator = [this]( + Rpc *const rpc, + ExecutionContext *const execution_context) { + std::unique_ptr rpc_handler = + cartographer::common::make_unique>( + event_callback); + rpc_handler->SetRpc(rpc); + rpc_handler->SetExecutionContext(execution_context); + return rpc_handler; + }; + return RpcHandlerInfo{ + RpcHandlerType::RequestType::default_instance().GetDescriptor(), + RpcHandlerType::ResponseType::default_instance().GetDescriptor(), + handler_instantiator, rpc_type, kFullyQualifiedMethodName}; + } + + ::grpc::internal::RpcMethod rpc_method_; + ::grpc::ClientContext context_; + std::shared_ptr<::grpc::ChannelInterface> channel_; + std::unique_ptr<::grpc::ClientWriter> + client_writer_; + std::unique_ptr response_; + cartographer::common::BlockingQueue + rpc_handler_event_queue_; +}; + +} // namespace testing +} // namespace framework +} // namespace cartographer_grpc + +#endif // CARTOGRAPHER_GRPC_FRAMEWORK_TESTING_RPC_HANDLER_TEST_SERVER_H_ diff --git a/cartographer_grpc/framework/testing/rpc_handler_wrapper.h b/cartographer_grpc/framework/testing/rpc_handler_wrapper.h new file mode 100644 index 0000000..db67fb1 --- /dev/null +++ b/cartographer_grpc/framework/testing/rpc_handler_wrapper.h @@ -0,0 +1,58 @@ +/* + * Copyright 2018 The Cartographer Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef CARTOGRAPHER_GRPC_FRAMEWORK_TESTING_RPC_HANDLER_WRAPPER_H_ +#define CARTOGRAPHER_GRPC_FRAMEWORK_TESTING_RPC_HANDLER_WRAPPER_H_ + +#include + +namespace cartographer_grpc { +namespace framework { +namespace testing { + +template +class RpcHandlerWrapper : public RpcHandlerType { + public: + enum RpcHandlerEvent { ON_REQUEST, ON_READS_DONE, ON_FINISH }; + using EventCallback = std::function; + + RpcHandlerWrapper(EventCallback event_callback) + : event_callback_(event_callback) {} + + void OnRequest(const typename RpcHandlerType::RequestType &request) override { + RpcHandlerType::OnRequest(request); + event_callback_(ON_REQUEST); + } + + void OnReadsDone() override { + RpcHandlerType::OnReadsDone(); + event_callback_(ON_READS_DONE); + } + + void OnFinish() override { + RpcHandlerType::OnFinish(); + event_callback_(ON_FINISH); + } + + private: + EventCallback event_callback_; +}; + +} // namespace testing +} // namespace framework +} // namespace cartographer_grpc + +#endif // CARTOGRAPHER_GRPC_FRAMEWORK_TESTING_RPC_HANDLER_WRAPPER_H_