diff --git a/cartographer_grpc/framework/rpc.cc b/cartographer_grpc/framework/rpc.cc index 9f6a67c..4f0d3d3 100644 --- a/cartographer_grpc/framework/rpc.cc +++ b/cartographer_grpc/framework/rpc.cc @@ -49,11 +49,6 @@ Rpc::Rpc(int method_index, execution_context_(execution_context), rpc_handler_info_(rpc_handler_info), service_(service), - new_connection_event_{Event::NEW_CONNECTION, this, false}, - read_event_{Event::READ, this, false}, - write_event_{Event::WRITE, this, false}, - finish_event_{Event::FINISH, this, false}, - done_event_{Event::DONE, this, false}, handler_(rpc_handler_info_.rpc_handler_factory(this, execution_context)) { InitializeReadersAndWriters(rpc_handler_info_.rpc_type); @@ -78,30 +73,30 @@ void Rpc::OnReadsDone() { handler_->OnReadsDone(); } void Rpc::RequestNextMethodInvocation() { // Ask gRPC to notify us when the connection terminates. - done_event_.pending = true; - server_context_.AsyncNotifyWhenDone(&done_event_); + SetRpcEventState(Event::DONE, true); + server_context_.AsyncNotifyWhenDone(new RpcEvent{Event::DONE, this}); // Make sure after terminating the connection, gRPC notifies us with this // event. - new_connection_event_.pending = true; + SetRpcEventState(Event::NEW_CONNECTION, true); switch (rpc_handler_info_.rpc_type) { case ::grpc::internal::RpcMethod::BIDI_STREAMING: service_->RequestAsyncBidiStreaming( method_index_, &server_context_, streaming_interface(), server_completion_queue_, server_completion_queue_, - &new_connection_event_); + new RpcEvent{Event::NEW_CONNECTION, this}); break; case ::grpc::internal::RpcMethod::CLIENT_STREAMING: service_->RequestAsyncClientStreaming( method_index_, &server_context_, streaming_interface(), server_completion_queue_, server_completion_queue_, - &new_connection_event_); + new RpcEvent{Event::NEW_CONNECTION, this}); break; case ::grpc::internal::RpcMethod::NORMAL_RPC: service_->RequestAsyncUnary( method_index_, &server_context_, request_.get(), streaming_interface(), server_completion_queue_, - server_completion_queue_, &new_connection_event_); + server_completion_queue_, new RpcEvent{Event::NEW_CONNECTION, this}); break; default: LOG(FATAL) << "RPC type not implemented."; @@ -113,8 +108,9 @@ void Rpc::RequestStreamingReadIfNeeded() { switch (rpc_handler_info_.rpc_type) { case ::grpc::internal::RpcMethod::BIDI_STREAMING: case ::grpc::internal::RpcMethod::CLIENT_STREAMING: - read_event_.pending = true; - async_reader_interface()->Read(request_.get(), &read_event_); + SetRpcEventState(Event::READ, true); + async_reader_interface()->Read(request_.get(), + new RpcEvent{Event::READ, this}); break; case ::grpc::internal::RpcMethod::NORMAL_RPC: // For NORMAL_RPC we don't have to do anything here, since gRPC @@ -149,21 +145,22 @@ void Rpc::Write(std::unique_ptr<::google::protobuf::Message> message) { void Rpc::SendFinish(std::unique_ptr<::google::protobuf::Message> message, ::grpc::Status status) { - finish_event_.pending = true; + SetRpcEventState(Event::FINISH, true); switch (rpc_handler_info_.rpc_type) { case ::grpc::internal::RpcMethod::BIDI_STREAMING: CHECK(!message); - server_async_reader_writer_->Finish(status, &finish_event_); + server_async_reader_writer_->Finish(status, + new RpcEvent{Event::FINISH, this}); break; case ::grpc::internal::RpcMethod::CLIENT_STREAMING: response_ = std::move(message); SendUnaryFinish(server_async_reader_.get(), status, response_.get(), - &finish_event_); + new RpcEvent{Event::FINISH, this}); break; case ::grpc::internal::RpcMethod::NORMAL_RPC: response_ = std::move(message); SendUnaryFinish(server_async_response_writer_.get(), status, - response_.get(), &finish_event_); + response_.get(), new RpcEvent{Event::FINISH, this}); break; default: LOG(FATAL) << "RPC type not implemented."; @@ -188,20 +185,21 @@ void Rpc::Finish(::grpc::Status status) { } void Rpc::PerformWriteIfNeeded() { - if (send_queue_.empty() || write_event_.pending) { + if (send_queue_.empty() || IsRpcEventPending(Event::WRITE)) { return; } // Make sure not other send operations are in-flight. - CHECK(!finish_event_.pending); + CHECK(!IsRpcEventPending(Event::FINISH)); SendItem send_item = std::move(send_queue_.front()); send_queue_.pop(); response_ = std::move(send_item.msg); if (response_) { - write_event_.pending = true; - async_writer_interface()->Write(*response_.get(), &write_event_); + SetRpcEventState(Event::WRITE, true); + async_writer_interface()->Write(*response_.get(), + new RpcEvent{Event::WRITE, this}); } else { CHECK(send_queue_.empty()); SendFinish(nullptr /* message */, send_item.status); @@ -253,22 +251,35 @@ Rpc::async_writer_interface() { LOG(FATAL) << "Never reached."; } -Rpc::RpcEvent* Rpc::GetRpcEvent(Event event) { +bool* Rpc::GetRpcEventState(Event event) { switch (event) { case Event::DONE: - return &done_event_; + return &done_event_pending_; case Event::FINISH: - return &finish_event_; + return &finish_event_pending_; case Event::NEW_CONNECTION: - return &new_connection_event_; + return &new_connection_event_pending_; case Event::READ: - return &read_event_; + return &read_event_pending_; case Event::WRITE: - return &write_event_; + return &write_event_pending_; } LOG(FATAL) << "Never reached."; } +void Rpc::SetRpcEventState(Event event, bool pending) { + *GetRpcEventState(event) = pending; +} + +bool Rpc::IsRpcEventPending(Event event) { return *GetRpcEventState(event); } + +bool Rpc::IsAnyEventPending() { + return IsRpcEventPending(Rpc::Event::DONE) || + IsRpcEventPending(Rpc::Event::READ) || + IsRpcEventPending(Rpc::Event::WRITE) || + IsRpcEventPending(Rpc::Event::FINISH); +} + ActiveRpcs::ActiveRpcs() : lock_() {} void Rpc::InitializeReadersAndWriters( diff --git a/cartographer_grpc/framework/rpc.h b/cartographer_grpc/framework/rpc.h index 5912a27..2afe96a 100644 --- a/cartographer_grpc/framework/rpc.h +++ b/cartographer_grpc/framework/rpc.h @@ -41,11 +41,6 @@ class Rpc { struct RpcEvent { const Event event; Rpc* rpc; - // Indicates whether the event is pending completion. E.g. 'event = READ' - // and 'pending = true' means that a read has been requested but hasn't - // completed yet. While 'pending = false' indicates, that the read has - // completed and currently no read is in-flight. - bool pending; }; Rpc(int method_index, ::grpc::ServerCompletionQueue* server_completion_queue, @@ -60,7 +55,9 @@ class Rpc { void Write(std::unique_ptr<::google::protobuf::Message> message); void Finish(::grpc::Status status); Service* service() { return service_; } - RpcEvent* GetRpcEvent(Event event); + void SetRpcEventState(Event event, bool pending); + bool IsRpcEventPending(Event event); + bool IsAnyEventPending(); private: struct SendItem { @@ -74,6 +71,7 @@ class Rpc { ::grpc::internal::RpcMethod::RpcType rpc_type); void SendFinish(std::unique_ptr<::google::protobuf::Message> message, ::grpc::Status status); + bool* GetRpcEventState(Event event); ::grpc::internal::AsyncReaderInterface<::google::protobuf::Message>* async_reader_interface(); @@ -89,11 +87,15 @@ class Rpc { Service* service_; ::grpc::ServerContext server_context_; - RpcEvent new_connection_event_; - RpcEvent read_event_; - RpcEvent write_event_; - RpcEvent finish_event_; - RpcEvent done_event_; + // These state variables indicate whether the corresponding event is currently + // pending completion, e.g. 'read_event_pending_ = true' means that a read has + // been requested but hasn't completed yet. 'read_event_pending_ = false' + // indicates that the read has completed and currently no read is in-flight. + bool new_connection_event_pending_ = false; + bool read_event_pending_ = false; + bool write_event_pending_ = false; + bool finish_event_pending_ = false; + bool done_event_pending_ = false; std::unique_ptr request_; std::unique_ptr response_; diff --git a/cartographer_grpc/framework/server.cc b/cartographer_grpc/framework/server.cc index 00e13c0..8b978e4 100644 --- a/cartographer_grpc/framework/server.cc +++ b/cartographer_grpc/framework/server.cc @@ -68,6 +68,7 @@ void Server::RunCompletionQueue( auto* rpc_event = static_cast(tag); rpc_event->rpc->service()->HandleEvent(rpc_event->event, rpc_event->rpc, ok); + delete rpc_event; } } diff --git a/cartographer_grpc/framework/service.cc b/cartographer_grpc/framework/service.cc index 7496e04..64ef53e 100644 --- a/cartographer_grpc/framework/service.cc +++ b/cartographer_grpc/framework/service.cc @@ -52,7 +52,7 @@ void Service::StartServing( void Service::StopServing() { shutting_down_ = true; } void Service::HandleEvent(Rpc::Event event, Rpc* rpc, bool ok) { - rpc->GetRpcEvent(event)->pending = false; + rpc->SetRpcEventState(event, false); switch (event) { case Rpc::Event::NEW_CONNECTION: HandleNewConnection(rpc, ok); @@ -131,10 +131,7 @@ void Service::HandleFinish(Rpc* rpc, bool ok) { void Service::HandleDone(Rpc* rpc, bool ok) { RemoveIfNotPending(rpc); } void Service::RemoveIfNotPending(Rpc* rpc) { - if (!rpc->GetRpcEvent(Rpc::Event::DONE)->pending && - !rpc->GetRpcEvent(Rpc::Event::READ)->pending && - !rpc->GetRpcEvent(Rpc::Event::WRITE)->pending && - !rpc->GetRpcEvent(Rpc::Event::FINISH)->pending) { + if (!rpc->IsAnyEventPending()) { active_rpcs_.Remove(rpc); } }