Heap-allocate RpcEvents. (#756)

Replace Rpc's RpcEvent members with heap-allocated RpcEvents.

[RFC=0002](https://github.com/googlecartographer/rfcs/blob/master/text/0002-cloud-based-mapping-1.md)
master
Christoph Schütte 2017-12-13 18:01:01 +01:00 committed by Wally B. Feed
parent 69787f288f
commit e16d1b1207
4 changed files with 54 additions and 43 deletions

View File

@ -49,11 +49,6 @@ Rpc::Rpc(int method_index,
execution_context_(execution_context), execution_context_(execution_context),
rpc_handler_info_(rpc_handler_info), rpc_handler_info_(rpc_handler_info),
service_(service), service_(service),
new_connection_event_{Event::NEW_CONNECTION, this, false},
read_event_{Event::READ, this, false},
write_event_{Event::WRITE, this, false},
finish_event_{Event::FINISH, this, false},
done_event_{Event::DONE, this, false},
handler_(rpc_handler_info_.rpc_handler_factory(this, execution_context)) { handler_(rpc_handler_info_.rpc_handler_factory(this, execution_context)) {
InitializeReadersAndWriters(rpc_handler_info_.rpc_type); InitializeReadersAndWriters(rpc_handler_info_.rpc_type);
@ -78,30 +73,30 @@ void Rpc::OnReadsDone() { handler_->OnReadsDone(); }
void Rpc::RequestNextMethodInvocation() { void Rpc::RequestNextMethodInvocation() {
// Ask gRPC to notify us when the connection terminates. // Ask gRPC to notify us when the connection terminates.
done_event_.pending = true; SetRpcEventState(Event::DONE, true);
server_context_.AsyncNotifyWhenDone(&done_event_); server_context_.AsyncNotifyWhenDone(new RpcEvent{Event::DONE, this});
// Make sure after terminating the connection, gRPC notifies us with this // Make sure after terminating the connection, gRPC notifies us with this
// event. // event.
new_connection_event_.pending = true; SetRpcEventState(Event::NEW_CONNECTION, true);
switch (rpc_handler_info_.rpc_type) { switch (rpc_handler_info_.rpc_type) {
case ::grpc::internal::RpcMethod::BIDI_STREAMING: case ::grpc::internal::RpcMethod::BIDI_STREAMING:
service_->RequestAsyncBidiStreaming( service_->RequestAsyncBidiStreaming(
method_index_, &server_context_, streaming_interface(), method_index_, &server_context_, streaming_interface(),
server_completion_queue_, server_completion_queue_, server_completion_queue_, server_completion_queue_,
&new_connection_event_); new RpcEvent{Event::NEW_CONNECTION, this});
break; break;
case ::grpc::internal::RpcMethod::CLIENT_STREAMING: case ::grpc::internal::RpcMethod::CLIENT_STREAMING:
service_->RequestAsyncClientStreaming( service_->RequestAsyncClientStreaming(
method_index_, &server_context_, streaming_interface(), method_index_, &server_context_, streaming_interface(),
server_completion_queue_, server_completion_queue_, server_completion_queue_, server_completion_queue_,
&new_connection_event_); new RpcEvent{Event::NEW_CONNECTION, this});
break; break;
case ::grpc::internal::RpcMethod::NORMAL_RPC: case ::grpc::internal::RpcMethod::NORMAL_RPC:
service_->RequestAsyncUnary( service_->RequestAsyncUnary(
method_index_, &server_context_, request_.get(), method_index_, &server_context_, request_.get(),
streaming_interface(), server_completion_queue_, streaming_interface(), server_completion_queue_,
server_completion_queue_, &new_connection_event_); server_completion_queue_, new RpcEvent{Event::NEW_CONNECTION, this});
break; break;
default: default:
LOG(FATAL) << "RPC type not implemented."; LOG(FATAL) << "RPC type not implemented.";
@ -113,8 +108,9 @@ void Rpc::RequestStreamingReadIfNeeded() {
switch (rpc_handler_info_.rpc_type) { switch (rpc_handler_info_.rpc_type) {
case ::grpc::internal::RpcMethod::BIDI_STREAMING: case ::grpc::internal::RpcMethod::BIDI_STREAMING:
case ::grpc::internal::RpcMethod::CLIENT_STREAMING: case ::grpc::internal::RpcMethod::CLIENT_STREAMING:
read_event_.pending = true; SetRpcEventState(Event::READ, true);
async_reader_interface()->Read(request_.get(), &read_event_); async_reader_interface()->Read(request_.get(),
new RpcEvent{Event::READ, this});
break; break;
case ::grpc::internal::RpcMethod::NORMAL_RPC: case ::grpc::internal::RpcMethod::NORMAL_RPC:
// For NORMAL_RPC we don't have to do anything here, since gRPC // For NORMAL_RPC we don't have to do anything here, since gRPC
@ -149,21 +145,22 @@ void Rpc::Write(std::unique_ptr<::google::protobuf::Message> message) {
void Rpc::SendFinish(std::unique_ptr<::google::protobuf::Message> message, void Rpc::SendFinish(std::unique_ptr<::google::protobuf::Message> message,
::grpc::Status status) { ::grpc::Status status) {
finish_event_.pending = true; SetRpcEventState(Event::FINISH, true);
switch (rpc_handler_info_.rpc_type) { switch (rpc_handler_info_.rpc_type) {
case ::grpc::internal::RpcMethod::BIDI_STREAMING: case ::grpc::internal::RpcMethod::BIDI_STREAMING:
CHECK(!message); CHECK(!message);
server_async_reader_writer_->Finish(status, &finish_event_); server_async_reader_writer_->Finish(status,
new RpcEvent{Event::FINISH, this});
break; break;
case ::grpc::internal::RpcMethod::CLIENT_STREAMING: case ::grpc::internal::RpcMethod::CLIENT_STREAMING:
response_ = std::move(message); response_ = std::move(message);
SendUnaryFinish(server_async_reader_.get(), status, response_.get(), SendUnaryFinish(server_async_reader_.get(), status, response_.get(),
&finish_event_); new RpcEvent{Event::FINISH, this});
break; break;
case ::grpc::internal::RpcMethod::NORMAL_RPC: case ::grpc::internal::RpcMethod::NORMAL_RPC:
response_ = std::move(message); response_ = std::move(message);
SendUnaryFinish(server_async_response_writer_.get(), status, SendUnaryFinish(server_async_response_writer_.get(), status,
response_.get(), &finish_event_); response_.get(), new RpcEvent{Event::FINISH, this});
break; break;
default: default:
LOG(FATAL) << "RPC type not implemented."; LOG(FATAL) << "RPC type not implemented.";
@ -188,20 +185,21 @@ void Rpc::Finish(::grpc::Status status) {
} }
void Rpc::PerformWriteIfNeeded() { void Rpc::PerformWriteIfNeeded() {
if (send_queue_.empty() || write_event_.pending) { if (send_queue_.empty() || IsRpcEventPending(Event::WRITE)) {
return; return;
} }
// Make sure not other send operations are in-flight. // Make sure not other send operations are in-flight.
CHECK(!finish_event_.pending); CHECK(!IsRpcEventPending(Event::FINISH));
SendItem send_item = std::move(send_queue_.front()); SendItem send_item = std::move(send_queue_.front());
send_queue_.pop(); send_queue_.pop();
response_ = std::move(send_item.msg); response_ = std::move(send_item.msg);
if (response_) { if (response_) {
write_event_.pending = true; SetRpcEventState(Event::WRITE, true);
async_writer_interface()->Write(*response_.get(), &write_event_); async_writer_interface()->Write(*response_.get(),
new RpcEvent{Event::WRITE, this});
} else { } else {
CHECK(send_queue_.empty()); CHECK(send_queue_.empty());
SendFinish(nullptr /* message */, send_item.status); SendFinish(nullptr /* message */, send_item.status);
@ -253,22 +251,35 @@ Rpc::async_writer_interface() {
LOG(FATAL) << "Never reached."; LOG(FATAL) << "Never reached.";
} }
Rpc::RpcEvent* Rpc::GetRpcEvent(Event event) { bool* Rpc::GetRpcEventState(Event event) {
switch (event) { switch (event) {
case Event::DONE: case Event::DONE:
return &done_event_; return &done_event_pending_;
case Event::FINISH: case Event::FINISH:
return &finish_event_; return &finish_event_pending_;
case Event::NEW_CONNECTION: case Event::NEW_CONNECTION:
return &new_connection_event_; return &new_connection_event_pending_;
case Event::READ: case Event::READ:
return &read_event_; return &read_event_pending_;
case Event::WRITE: case Event::WRITE:
return &write_event_; return &write_event_pending_;
} }
LOG(FATAL) << "Never reached."; LOG(FATAL) << "Never reached.";
} }
void Rpc::SetRpcEventState(Event event, bool pending) {
*GetRpcEventState(event) = pending;
}
bool Rpc::IsRpcEventPending(Event event) { return *GetRpcEventState(event); }
bool Rpc::IsAnyEventPending() {
return IsRpcEventPending(Rpc::Event::DONE) ||
IsRpcEventPending(Rpc::Event::READ) ||
IsRpcEventPending(Rpc::Event::WRITE) ||
IsRpcEventPending(Rpc::Event::FINISH);
}
ActiveRpcs::ActiveRpcs() : lock_() {} ActiveRpcs::ActiveRpcs() : lock_() {}
void Rpc::InitializeReadersAndWriters( void Rpc::InitializeReadersAndWriters(

View File

@ -41,11 +41,6 @@ class Rpc {
struct RpcEvent { struct RpcEvent {
const Event event; const Event event;
Rpc* rpc; Rpc* rpc;
// Indicates whether the event is pending completion. E.g. 'event = READ'
// and 'pending = true' means that a read has been requested but hasn't
// completed yet. While 'pending = false' indicates, that the read has
// completed and currently no read is in-flight.
bool pending;
}; };
Rpc(int method_index, ::grpc::ServerCompletionQueue* server_completion_queue, Rpc(int method_index, ::grpc::ServerCompletionQueue* server_completion_queue,
@ -60,7 +55,9 @@ class Rpc {
void Write(std::unique_ptr<::google::protobuf::Message> message); void Write(std::unique_ptr<::google::protobuf::Message> message);
void Finish(::grpc::Status status); void Finish(::grpc::Status status);
Service* service() { return service_; } Service* service() { return service_; }
RpcEvent* GetRpcEvent(Event event); void SetRpcEventState(Event event, bool pending);
bool IsRpcEventPending(Event event);
bool IsAnyEventPending();
private: private:
struct SendItem { struct SendItem {
@ -74,6 +71,7 @@ class Rpc {
::grpc::internal::RpcMethod::RpcType rpc_type); ::grpc::internal::RpcMethod::RpcType rpc_type);
void SendFinish(std::unique_ptr<::google::protobuf::Message> message, void SendFinish(std::unique_ptr<::google::protobuf::Message> message,
::grpc::Status status); ::grpc::Status status);
bool* GetRpcEventState(Event event);
::grpc::internal::AsyncReaderInterface<::google::protobuf::Message>* ::grpc::internal::AsyncReaderInterface<::google::protobuf::Message>*
async_reader_interface(); async_reader_interface();
@ -89,11 +87,15 @@ class Rpc {
Service* service_; Service* service_;
::grpc::ServerContext server_context_; ::grpc::ServerContext server_context_;
RpcEvent new_connection_event_; // These state variables indicate whether the corresponding event is currently
RpcEvent read_event_; // pending completion, e.g. 'read_event_pending_ = true' means that a read has
RpcEvent write_event_; // been requested but hasn't completed yet. 'read_event_pending_ = false'
RpcEvent finish_event_; // indicates that the read has completed and currently no read is in-flight.
RpcEvent done_event_; bool new_connection_event_pending_ = false;
bool read_event_pending_ = false;
bool write_event_pending_ = false;
bool finish_event_pending_ = false;
bool done_event_pending_ = false;
std::unique_ptr<google::protobuf::Message> request_; std::unique_ptr<google::protobuf::Message> request_;
std::unique_ptr<google::protobuf::Message> response_; std::unique_ptr<google::protobuf::Message> response_;

View File

@ -68,6 +68,7 @@ void Server::RunCompletionQueue(
auto* rpc_event = static_cast<Rpc::RpcEvent*>(tag); auto* rpc_event = static_cast<Rpc::RpcEvent*>(tag);
rpc_event->rpc->service()->HandleEvent(rpc_event->event, rpc_event->rpc, rpc_event->rpc->service()->HandleEvent(rpc_event->event, rpc_event->rpc,
ok); ok);
delete rpc_event;
} }
} }

View File

@ -52,7 +52,7 @@ void Service::StartServing(
void Service::StopServing() { shutting_down_ = true; } void Service::StopServing() { shutting_down_ = true; }
void Service::HandleEvent(Rpc::Event event, Rpc* rpc, bool ok) { void Service::HandleEvent(Rpc::Event event, Rpc* rpc, bool ok) {
rpc->GetRpcEvent(event)->pending = false; rpc->SetRpcEventState(event, false);
switch (event) { switch (event) {
case Rpc::Event::NEW_CONNECTION: case Rpc::Event::NEW_CONNECTION:
HandleNewConnection(rpc, ok); HandleNewConnection(rpc, ok);
@ -131,10 +131,7 @@ void Service::HandleFinish(Rpc* rpc, bool ok) {
void Service::HandleDone(Rpc* rpc, bool ok) { RemoveIfNotPending(rpc); } void Service::HandleDone(Rpc* rpc, bool ok) { RemoveIfNotPending(rpc); }
void Service::RemoveIfNotPending(Rpc* rpc) { void Service::RemoveIfNotPending(Rpc* rpc) {
if (!rpc->GetRpcEvent(Rpc::Event::DONE)->pending && if (!rpc->IsAnyEventPending()) {
!rpc->GetRpcEvent(Rpc::Event::READ)->pending &&
!rpc->GetRpcEvent(Rpc::Event::WRITE)->pending &&
!rpc->GetRpcEvent(Rpc::Event::FINISH)->pending) {
active_rpcs_.Remove(rpc); active_rpcs_.Remove(rpc);
} }
} }