diff --git a/cartographer_grpc/framework/rpc.cc b/cartographer_grpc/framework/rpc.cc new file mode 100644 index 0000000..4fbc363 --- /dev/null +++ b/cartographer_grpc/framework/rpc.cc @@ -0,0 +1,79 @@ +/* + * Copyright 2017 The Cartographer Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "cartographer_grpc/framework/rpc.h" + +#include "glog/logging.h" +#include "grpc++/impl/codegen/service_type.h" + +namespace cartographer_grpc { +namespace framework { + +Rpc::Rpc(const RpcHandlerInfo& rpc_handler_info) + : rpc_handler_info_(rpc_handler_info) {} + +::grpc::internal::RpcMethod::RpcType Rpc::rpc_type() const { + return rpc_handler_info_.rpc_type; +} + +::grpc::internal::ServerAsyncStreamingInterface* Rpc::responder() { + LOG(FATAL) << "Not yet implemented"; + return nullptr; +} + +Rpc::RpcState* Rpc::GetState(State state) { + switch (state) { + case State::NEW_CONNECTION: + return &new_connection_state_; + case State::READ: + return &read_state_; + case State::WRITE: + return &write_state_; + case State::DONE: + return &done_state_; + } + LOG(FATAL) << "Never reached."; +} + +ActiveRpcs::ActiveRpcs() : lock_() {} + +ActiveRpcs::~ActiveRpcs() { + cartographer::common::MutexLocker locker(&lock_); + if (!rpcs_.empty()) { + LOG(FATAL) << "RPCs still in flight!"; + } +} + +Rpc* ActiveRpcs::Add(std::unique_ptr rpc) { + cartographer::common::MutexLocker locker(&lock_); + const auto result = rpcs_.emplace(rpc.release()); + CHECK(result.second) << "RPC already active."; + return *result.first; +} + +bool ActiveRpcs::Remove(Rpc* rpc) { + cartographer::common::MutexLocker locker(&lock_); + auto it = rpcs_.find(rpc); + if (it != rpcs_.end()) { + delete rpc; + rpcs_.erase(it); + return true; + } + return false; +} + +} // namespace framework +} // namespace cartographer_grpc diff --git a/cartographer_grpc/framework/rpc.h b/cartographer_grpc/framework/rpc.h new file mode 100644 index 0000000..0314e0d --- /dev/null +++ b/cartographer_grpc/framework/rpc.h @@ -0,0 +1,84 @@ +/* + * Copyright 2017 The Cartographer Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef CARTOGRAPHER_GRPC_FRAMEWORK_RPC_H +#define CARTOGRAPHER_GRPC_FRAMEWORK_RPC_H + +#include +#include + +#include "cartographer/common/mutex.h" +#include "cartographer_grpc/framework/rpc_handler.h" +#include "grpc++/grpc++.h" + +namespace cartographer_grpc { +namespace framework { + +class Service; +class Rpc { + public: + enum class State { NEW_CONNECTION = 0, READ, WRITE, DONE }; + struct RpcState { + const State state; + Rpc* rpc; + }; + + Rpc(const RpcHandlerInfo& rpc_handler_info); + + ::grpc::internal::RpcMethod::RpcType rpc_type() const; + ::grpc::ServerContext* server_context() { return &server_context_; } + ::grpc::internal::ServerAsyncStreamingInterface* responder(); + RpcState* GetState(State state); + + ::google::protobuf::Message* request() { return request_.get(); } + ::google::protobuf::Message* response() { return response_.get(); } + + private: + Rpc(const Rpc&) = delete; + Rpc& operator=(const Rpc&) = delete; + + RpcHandlerInfo rpc_handler_info_; + ::grpc::ServerContext server_context_; + + RpcState new_connection_state_ = RpcState{State::NEW_CONNECTION, this}; + RpcState read_state_ = RpcState{State::READ, this}; + RpcState write_state_ = RpcState{State::WRITE, this}; + RpcState done_state_ = RpcState{State::DONE, this}; + + std::unique_ptr request_; + std::unique_ptr response_; +}; + +// This class keeps track of all in-flight RPCs for a 'Service'. Make sure that +// all RPCs have been terminated and removed from this object before it goes out +// of scope. +class ActiveRpcs { + public: + ActiveRpcs(); + ~ActiveRpcs() EXCLUDES(lock_); + + Rpc* Add(std::unique_ptr rpc) EXCLUDES(lock_); + bool Remove(Rpc* rpc) EXCLUDES(lock_); + + private: + cartographer::common::Mutex lock_; + std::unordered_set rpcs_; +}; + +} // namespace framework +} // namespace cartographer_grpc + +#endif // CARTOGRAPHER_GRPC_FRAMEWORK_RPC_H diff --git a/cartographer_grpc/framework/rpc_handler.h b/cartographer_grpc/framework/rpc_handler.h index 9965dbe..e5f4668 100644 --- a/cartographer_grpc/framework/rpc_handler.h +++ b/cartographer_grpc/framework/rpc_handler.h @@ -17,7 +17,9 @@ #ifndef CARTOGRAPHER_GRPC_FRAMEWORK_RPC_HANDLER_H #define CARTOGRAPHER_GRPC_FRAMEWORK_RPC_HANDLER_H +#include "cartographer_grpc/framework/type_traits.h" #include "google/protobuf/message.h" +#include "grpc++/grpc++.h" namespace cartographer_grpc { namespace framework { @@ -35,6 +37,16 @@ struct RpcHandlerInfo { const google::protobuf::Descriptor* request_descriptor; const google::protobuf::Descriptor* response_descriptor; const RpcHandlerFactory rpc_handler_factory; + const grpc::internal::RpcMethod::RpcType rpc_type; +}; + +template +class RpcHandler : public RpcHandlerInterface { + public: + using IncomingType = Incoming; + using OutgoingType = Outgoing; + using RequestType = StripStream; + using ResponseType = StripStream; }; } // namespace framework diff --git a/cartographer_grpc/framework/server.cc b/cartographer_grpc/framework/server.cc index 83924d2..bbaa0d6 100644 --- a/cartographer_grpc/framework/server.cc +++ b/cartographer_grpc/framework/server.cc @@ -55,7 +55,7 @@ void Server::AddService( // Instantiate and register service. const auto result = services_.emplace(std::piecewise_construct, std::make_tuple(service_name), - std::make_tuple(rpc_handler_infos)); + std::make_tuple(service_name, rpc_handler_infos)); CHECK(result.second) << "A service named " << service_name << " already exists."; server_builder_.RegisterService(&result.first->second); diff --git a/cartographer_grpc/framework/server.h b/cartographer_grpc/framework/server.h index b81a862..f49528b 100644 --- a/cartographer_grpc/framework/server.h +++ b/cartographer_grpc/framework/server.h @@ -62,7 +62,9 @@ class Server { cartographer::common::make_unique(); rpc_handler->SetRpc(rpc); return rpc_handler; - }}); + }, + RpcType::value}); } private: diff --git a/cartographer_grpc/framework/service.cc b/cartographer_grpc/framework/service.cc index 973d555..35395ac 100644 --- a/cartographer_grpc/framework/service.cc +++ b/cartographer_grpc/framework/service.cc @@ -16,11 +16,53 @@ #include "cartographer_grpc/framework/server.h" +#include "glog/logging.h" +#include "grpc++/impl/codegen/proto_utils.h" + namespace cartographer_grpc { namespace framework { -Service::Service(const std::map& rpc_handlers) - : rpc_handlers_(rpc_handlers) {} +Service::Service(const std::string& service_name, + const std::map& rpc_handler_infos) + : rpc_handler_infos_(rpc_handler_infos) { + for (const auto& rpc_handler_info : rpc_handler_infos_) { + std::string fully_qualified_method_name = + "/" + service_name + "/" + rpc_handler_info.first; + // The 'handler' below is set to 'nullptr' indicating that we want to + // handle this method asynchronously. + this->AddMethod(new grpc::internal::RpcServiceMethod( + fully_qualified_method_name.c_str(), rpc_handler_info.second.rpc_type, + nullptr /* handler */)); + } +} + +void Service::StartServing( + const std::vector<::grpc::ServerCompletionQueue*>& completion_queues) { + int i = 0; + for (const auto& rpc_handler_info : rpc_handler_infos_) { + for (auto completion_queue : completion_queues) { + Rpc* rpc = active_rpcs_.Add( + cartographer::common::make_unique(rpc_handler_info.second)); + RequestNextMethodInvocation(i, rpc, completion_queue); + } + ++i; + } +} + +void Service::RequestNextMethodInvocation( + int method_index, Rpc* rpc, + ::grpc::ServerCompletionQueue* completion_queue) { + switch (rpc->rpc_type()) { + case ::grpc::internal::RpcMethod::CLIENT_STREAMING: + RequestAsyncClientStreaming(method_index, rpc->server_context(), + rpc->responder(), completion_queue, + completion_queue, + rpc->GetState(Rpc::State::NEW_CONNECTION)); + break; + default: + LOG(FATAL) << "RPC type not implemented."; + } +} } // namespace framework } // namespace cartographer_grpc diff --git a/cartographer_grpc/framework/service.h b/cartographer_grpc/framework/service.h index fac13c1..6876bca 100644 --- a/cartographer_grpc/framework/service.h +++ b/cartographer_grpc/framework/service.h @@ -17,6 +17,7 @@ #ifndef CARTOGRAPHER_GRPC_FRAMEWORK_SERVICE_H #define CARTOGRAPHER_GRPC_FRAMEWORK_SERVICE_H +#include "cartographer_grpc/framework/rpc.h" #include "cartographer_grpc/framework/rpc_handler.h" #include "grpc++/impl/codegen/service_type.h" @@ -29,10 +30,18 @@ namespace framework { // 'Rpc' handler objects. class Service : public ::grpc::Service { public: - Service(const std::map& rpc_handlers); + Service(const std::string& service_name, + const std::map& rpc_handlers); + void StartServing( + const std::vector<::grpc::ServerCompletionQueue*>& completion_queues); private: - std::map rpc_handlers_; + void RequestNextMethodInvocation( + int method_index, Rpc* rpc, + ::grpc::ServerCompletionQueue* completion_queue); + + std::map rpc_handler_infos_; + ActiveRpcs active_rpcs_; }; } // namespace framework diff --git a/cartographer_grpc/framework/type_traits.h b/cartographer_grpc/framework/type_traits.h new file mode 100644 index 0000000..9daed7d --- /dev/null +++ b/cartographer_grpc/framework/type_traits.h @@ -0,0 +1,69 @@ +/* + * Copyright 2017 The Cartographer Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef CARTOGRAPHER_GRPC_FRAMEWORK_TYPES_H +#define CARTOGRAPHER_GRPC_FRAMEWORK_TYPES_H + +#include + +namespace cartographer_grpc { +namespace framework { + +template +class Stream { + using type = Request; +}; + +template