Implement server-streaming RPCs and add unittest (#768)

[RFC=0002](https://github.com/googlecartographer/rfcs/blob/master/text/0002-cloud-based-mapping-1.md)
master
Christoph Schütte 2017-12-19 10:27:30 +01:00 committed by Wally B. Feed
parent c79425cbb0
commit dea6c3d7ce
4 changed files with 75 additions and 17 deletions

View File

@ -40,10 +40,19 @@ message GetEchoResponse {
int32 output = 1; int32 output = 1;
} }
message GetSequenceRequest {
int32 input = 1;
}
message GetSequenceResponse {
int32 output = 1;
}
// Provides information about the gRPC server. // Provides information about the gRPC server.
service Math { service Math {
rpc GetSum(stream GetSumRequest) returns (GetSumResponse); rpc GetSum(stream GetSumRequest) returns (GetSumResponse);
rpc GetSquare(GetSquareRequest) returns (GetSquareResponse); rpc GetSquare(GetSquareRequest) returns (GetSquareResponse);
rpc GetRunningSum(stream GetSumRequest) returns (stream GetSumResponse); rpc GetRunningSum(stream GetSumRequest) returns (stream GetSumResponse);
rpc GetEcho(GetEchoRequest) returns (GetEchoResponse); rpc GetEcho(GetEchoRequest) returns (GetEchoResponse);
rpc GetSequence(GetSequenceRequest) returns (stream GetSequenceResponse);
} }

View File

@ -103,8 +103,13 @@ void Rpc::RequestNextMethodInvocation() {
server_completion_queue_, server_completion_queue_,
new RpcEvent{Event::NEW_CONNECTION, weak_ptr_factory_(this), true}); new RpcEvent{Event::NEW_CONNECTION, weak_ptr_factory_(this), true});
break; break;
default: case ::grpc::internal::RpcMethod::SERVER_STREAMING:
LOG(FATAL) << "RPC type not implemented."; service_->RequestAsyncServerStreaming(
method_index_, &server_context_, request_.get(),
streaming_interface(), server_completion_queue_,
server_completion_queue_,
new RpcEvent{Event::NEW_CONNECTION, weak_ptr_factory_(this), true});
break;
} }
} }
@ -119,14 +124,13 @@ void Rpc::RequestStreamingReadIfNeeded() {
new RpcEvent{Event::READ, weak_ptr_factory_(this), true}); new RpcEvent{Event::READ, weak_ptr_factory_(this), true});
break; break;
case ::grpc::internal::RpcMethod::NORMAL_RPC: case ::grpc::internal::RpcMethod::NORMAL_RPC:
// For NORMAL_RPC we don't have to do anything here, since gRPC case ::grpc::internal::RpcMethod::SERVER_STREAMING:
// automatically issues a READ request and places the request into the // For NORMAL_RPC and SERVER_STREAMING we don't need to queue an event,
// 'Message' we provided to 'RequestAsyncUnary' above. // since gRPC automatically issues a READ request and places the request
// into the 'Message' we provided to 'RequestAsyncUnary' above.
OnRequest(); OnRequest();
OnReadsDone(); OnReadsDone();
break; break;
default:
LOG(FATAL) << "RPC type not implemented.";
} }
} }
@ -172,8 +176,8 @@ void Rpc::HandleSendQueue() {
return server_async_reader_.get(); return server_async_reader_.get();
case ::grpc::internal::RpcMethod::NORMAL_RPC: case ::grpc::internal::RpcMethod::NORMAL_RPC:
return server_async_response_writer_.get(); return server_async_response_writer_.get();
default: case ::grpc::internal::RpcMethod::SERVER_STREAMING:
LOG(FATAL) << "RPC type not implemented."; return server_async_writer_.get();
} }
LOG(FATAL) << "Never reached."; LOG(FATAL) << "Never reached.";
} }
@ -187,8 +191,9 @@ Rpc::async_reader_interface() {
return server_async_reader_.get(); return server_async_reader_.get();
case ::grpc::internal::RpcMethod::NORMAL_RPC: case ::grpc::internal::RpcMethod::NORMAL_RPC:
LOG(FATAL) << "For NORMAL_RPC no streaming reader interface exists."; LOG(FATAL) << "For NORMAL_RPC no streaming reader interface exists.";
default: case ::grpc::internal::RpcMethod::SERVER_STREAMING:
LOG(FATAL) << "RPC type not implemented."; LOG(FATAL)
<< "For SERVER_STREAMING no streaming reader interface exists.";
} }
LOG(FATAL) << "Never reached."; LOG(FATAL) << "Never reached.";
} }
@ -203,8 +208,8 @@ Rpc::async_writer_interface() {
LOG(FATAL) << "For NORMAL_RPC and CLIENT_STREAMING no streaming writer " LOG(FATAL) << "For NORMAL_RPC and CLIENT_STREAMING no streaming writer "
"interface exists."; "interface exists.";
break; break;
default: case ::grpc::internal::RpcMethod::SERVER_STREAMING:
LOG(FATAL) << "RPC type not implemented."; return server_async_writer_.get();
} }
LOG(FATAL) << "Never reached."; LOG(FATAL) << "Never reached.";
} }
@ -253,8 +258,11 @@ void Rpc::PerformFinish(std::unique_ptr<::google::protobuf::Message> message,
server_async_response_writer_.get(), status, response_.get(), server_async_response_writer_.get(), status, response_.get(),
new RpcEvent{Event::FINISH, weak_ptr_factory_(this), true}); new RpcEvent{Event::FINISH, weak_ptr_factory_(this), true});
break; break;
default: case ::grpc::internal::RpcMethod::SERVER_STREAMING:
LOG(FATAL) << "RPC type not implemented."; CHECK(!message);
server_async_writer_->Finish(
status, new RpcEvent{Event::FINISH, weak_ptr_factory_(this), true});
break;
} }
} }
@ -307,8 +315,11 @@ void Rpc::InitializeReadersAndWriters(
::grpc::ServerAsyncResponseWriter<google::protobuf::Message>>( ::grpc::ServerAsyncResponseWriter<google::protobuf::Message>>(
&server_context_); &server_context_);
break; break;
default: case ::grpc::internal::RpcMethod::SERVER_STREAMING:
LOG(FATAL) << "RPC type not implemented."; server_async_writer_ = cartographer::common::make_unique<
::grpc::ServerAsyncWriter<google::protobuf::Message>>(
&server_context_);
break;
} }
} }

View File

@ -132,6 +132,8 @@ class Rpc {
std::unique_ptr<::grpc::ServerAsyncReaderWriter<google::protobuf::Message, std::unique_ptr<::grpc::ServerAsyncReaderWriter<google::protobuf::Message,
google::protobuf::Message>> google::protobuf::Message>>
server_async_reader_writer_; server_async_reader_writer_;
std::unique_ptr<::grpc::ServerAsyncWriter<google::protobuf::Message>>
server_async_writer_;
cartographer::common::Mutex send_queue_lock_; cartographer::common::Mutex send_queue_lock_;
std::queue<SendItem> send_queue_; std::queue<SendItem> send_queue_;

View File

@ -101,6 +101,21 @@ class GetEchoHandler
} }
}; };
class GetSequenceHandler
: public RpcHandler<proto::GetSequenceRequest,
Stream<proto::GetSequenceResponse>> {
public:
void OnRequest(const proto::GetSequenceRequest& request) override {
for (int i = 0; i < request.input(); ++i) {
auto response =
cartographer::common::make_unique<proto::GetSequenceResponse>();
response->set_output(i);
Send(std::move(response));
}
Finish(::grpc::Status::OK);
}
};
// TODO(cschuet): Due to the hard-coded part these tests will become flaky when // TODO(cschuet): Due to the hard-coded part these tests will become flaky when
// run in parallel. It would be nice to find a way to solve that. gRPC also // run in parallel. It would be nice to find a way to solve that. gRPC also
// allows to communicate over UNIX domain sockets. // allows to communicate over UNIX domain sockets.
@ -119,6 +134,8 @@ class ServerTest : public ::testing::Test {
server_builder.RegisterHandler<GetRunningSumHandler, proto::Math>( server_builder.RegisterHandler<GetRunningSumHandler, proto::Math>(
"GetRunningSum"); "GetRunningSum");
server_builder.RegisterHandler<GetEchoHandler, proto::Math>("GetEcho"); server_builder.RegisterHandler<GetEchoHandler, proto::Math>("GetEcho");
server_builder.RegisterHandler<GetSequenceHandler, proto::Math>(
"GetSequence");
server_ = server_builder.Build(); server_ = server_builder.Build();
client_channel_ = client_channel_ =
@ -189,6 +206,7 @@ TEST_F(ServerTest, ProcessBidiStreamingRpcTest) {
expected_responses.pop_front(); expected_responses.pop_front();
} }
EXPECT_TRUE(expected_responses.empty()); EXPECT_TRUE(expected_responses.empty());
EXPECT_TRUE(reader_writer->Finish().ok());
server_->Shutdown(); server_->Shutdown();
} }
@ -219,6 +237,24 @@ TEST_F(ServerTest, WriteFromOtherThread) {
server_->Shutdown(); server_->Shutdown();
} }
TEST_F(ServerTest, ProcessServerStreamingRpcTest) {
server_->Start();
proto::GetSequenceRequest request;
request.set_input(12);
auto reader = stub_->GetSequence(&client_context_, request);
proto::GetSequenceResponse response;
for (int i = 0; i < 12; ++i) {
EXPECT_TRUE(reader->Read(&response));
EXPECT_EQ(response.output(), i);
}
EXPECT_FALSE(reader->Read(&response));
EXPECT_TRUE(reader->Finish().ok());
server_->Shutdown();
}
} // namespace } // namespace
} // namespace framework } // namespace framework
} // namespace cartographer_grpc } // namespace cartographer_grpc