Introduce RPC class and start wiring up in Service (#701)

[RFC=0002](https://github.com/googlecartographer/rfcs/blob/master/text/0002-cloud-based-mapping-1.md)
master
Christoph Schütte 2017-11-24 23:41:58 +01:00 committed by Wally B. Feed
parent f6a7dfd07b
commit cd289bbcee
10 changed files with 347 additions and 9 deletions

View File

@ -0,0 +1,79 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cartographer_grpc/framework/rpc.h"
#include "glog/logging.h"
#include "grpc++/impl/codegen/service_type.h"
namespace cartographer_grpc {
namespace framework {
Rpc::Rpc(const RpcHandlerInfo& rpc_handler_info)
: rpc_handler_info_(rpc_handler_info) {}
::grpc::internal::RpcMethod::RpcType Rpc::rpc_type() const {
return rpc_handler_info_.rpc_type;
}
::grpc::internal::ServerAsyncStreamingInterface* Rpc::responder() {
LOG(FATAL) << "Not yet implemented";
return nullptr;
}
Rpc::RpcState* Rpc::GetState(State state) {
switch (state) {
case State::NEW_CONNECTION:
return &new_connection_state_;
case State::READ:
return &read_state_;
case State::WRITE:
return &write_state_;
case State::DONE:
return &done_state_;
}
LOG(FATAL) << "Never reached.";
}
ActiveRpcs::ActiveRpcs() : lock_() {}
ActiveRpcs::~ActiveRpcs() {
cartographer::common::MutexLocker locker(&lock_);
if (!rpcs_.empty()) {
LOG(FATAL) << "RPCs still in flight!";
}
}
Rpc* ActiveRpcs::Add(std::unique_ptr<Rpc> rpc) {
cartographer::common::MutexLocker locker(&lock_);
const auto result = rpcs_.emplace(rpc.release());
CHECK(result.second) << "RPC already active.";
return *result.first;
}
bool ActiveRpcs::Remove(Rpc* rpc) {
cartographer::common::MutexLocker locker(&lock_);
auto it = rpcs_.find(rpc);
if (it != rpcs_.end()) {
delete rpc;
rpcs_.erase(it);
return true;
}
return false;
}
} // namespace framework
} // namespace cartographer_grpc

View File

@ -0,0 +1,84 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CARTOGRAPHER_GRPC_FRAMEWORK_RPC_H
#define CARTOGRAPHER_GRPC_FRAMEWORK_RPC_H
#include <memory>
#include <unordered_set>
#include "cartographer/common/mutex.h"
#include "cartographer_grpc/framework/rpc_handler.h"
#include "grpc++/grpc++.h"
namespace cartographer_grpc {
namespace framework {
class Service;
class Rpc {
public:
enum class State { NEW_CONNECTION = 0, READ, WRITE, DONE };
struct RpcState {
const State state;
Rpc* rpc;
};
Rpc(const RpcHandlerInfo& rpc_handler_info);
::grpc::internal::RpcMethod::RpcType rpc_type() const;
::grpc::ServerContext* server_context() { return &server_context_; }
::grpc::internal::ServerAsyncStreamingInterface* responder();
RpcState* GetState(State state);
::google::protobuf::Message* request() { return request_.get(); }
::google::protobuf::Message* response() { return response_.get(); }
private:
Rpc(const Rpc&) = delete;
Rpc& operator=(const Rpc&) = delete;
RpcHandlerInfo rpc_handler_info_;
::grpc::ServerContext server_context_;
RpcState new_connection_state_ = RpcState{State::NEW_CONNECTION, this};
RpcState read_state_ = RpcState{State::READ, this};
RpcState write_state_ = RpcState{State::WRITE, this};
RpcState done_state_ = RpcState{State::DONE, this};
std::unique_ptr<google::protobuf::Message> request_;
std::unique_ptr<google::protobuf::Message> response_;
};
// This class keeps track of all in-flight RPCs for a 'Service'. Make sure that
// all RPCs have been terminated and removed from this object before it goes out
// of scope.
class ActiveRpcs {
public:
ActiveRpcs();
~ActiveRpcs() EXCLUDES(lock_);
Rpc* Add(std::unique_ptr<Rpc> rpc) EXCLUDES(lock_);
bool Remove(Rpc* rpc) EXCLUDES(lock_);
private:
cartographer::common::Mutex lock_;
std::unordered_set<Rpc*> rpcs_;
};
} // namespace framework
} // namespace cartographer_grpc
#endif // CARTOGRAPHER_GRPC_FRAMEWORK_RPC_H

View File

@ -17,7 +17,9 @@
#ifndef CARTOGRAPHER_GRPC_FRAMEWORK_RPC_HANDLER_H #ifndef CARTOGRAPHER_GRPC_FRAMEWORK_RPC_HANDLER_H
#define CARTOGRAPHER_GRPC_FRAMEWORK_RPC_HANDLER_H #define CARTOGRAPHER_GRPC_FRAMEWORK_RPC_HANDLER_H
#include "cartographer_grpc/framework/type_traits.h"
#include "google/protobuf/message.h" #include "google/protobuf/message.h"
#include "grpc++/grpc++.h"
namespace cartographer_grpc { namespace cartographer_grpc {
namespace framework { namespace framework {
@ -35,6 +37,16 @@ struct RpcHandlerInfo {
const google::protobuf::Descriptor* request_descriptor; const google::protobuf::Descriptor* request_descriptor;
const google::protobuf::Descriptor* response_descriptor; const google::protobuf::Descriptor* response_descriptor;
const RpcHandlerFactory rpc_handler_factory; const RpcHandlerFactory rpc_handler_factory;
const grpc::internal::RpcMethod::RpcType rpc_type;
};
template <typename Incoming, typename Outgoing>
class RpcHandler : public RpcHandlerInterface {
public:
using IncomingType = Incoming;
using OutgoingType = Outgoing;
using RequestType = StripStream<Incoming>;
using ResponseType = StripStream<Outgoing>;
}; };
} // namespace framework } // namespace framework

View File

@ -55,7 +55,7 @@ void Server::AddService(
// Instantiate and register service. // Instantiate and register service.
const auto result = const auto result =
services_.emplace(std::piecewise_construct, std::make_tuple(service_name), services_.emplace(std::piecewise_construct, std::make_tuple(service_name),
std::make_tuple(rpc_handler_infos)); std::make_tuple(service_name, rpc_handler_infos));
CHECK(result.second) << "A service named " << service_name CHECK(result.second) << "A service named " << service_name
<< " already exists."; << " already exists.";
server_builder_.RegisterService(&result.first->second); server_builder_.RegisterService(&result.first->second);

View File

@ -62,7 +62,9 @@ class Server {
cartographer::common::make_unique<RpcHandlerType>(); cartographer::common::make_unique<RpcHandlerType>();
rpc_handler->SetRpc(rpc); rpc_handler->SetRpc(rpc);
return rpc_handler; return rpc_handler;
}}); },
RpcType<typename RpcHandlerType::Incoming,
typename RpcHandlerType::Outgoing>::value});
} }
private: private:

View File

@ -16,11 +16,53 @@
#include "cartographer_grpc/framework/server.h" #include "cartographer_grpc/framework/server.h"
#include "glog/logging.h"
#include "grpc++/impl/codegen/proto_utils.h"
namespace cartographer_grpc { namespace cartographer_grpc {
namespace framework { namespace framework {
Service::Service(const std::map<std::string, RpcHandlerInfo>& rpc_handlers) Service::Service(const std::string& service_name,
: rpc_handlers_(rpc_handlers) {} const std::map<std::string, RpcHandlerInfo>& rpc_handler_infos)
: rpc_handler_infos_(rpc_handler_infos) {
for (const auto& rpc_handler_info : rpc_handler_infos_) {
std::string fully_qualified_method_name =
"/" + service_name + "/" + rpc_handler_info.first;
// The 'handler' below is set to 'nullptr' indicating that we want to
// handle this method asynchronously.
this->AddMethod(new grpc::internal::RpcServiceMethod(
fully_qualified_method_name.c_str(), rpc_handler_info.second.rpc_type,
nullptr /* handler */));
}
}
void Service::StartServing(
const std::vector<::grpc::ServerCompletionQueue*>& completion_queues) {
int i = 0;
for (const auto& rpc_handler_info : rpc_handler_infos_) {
for (auto completion_queue : completion_queues) {
Rpc* rpc = active_rpcs_.Add(
cartographer::common::make_unique<Rpc>(rpc_handler_info.second));
RequestNextMethodInvocation(i, rpc, completion_queue);
}
++i;
}
}
void Service::RequestNextMethodInvocation(
int method_index, Rpc* rpc,
::grpc::ServerCompletionQueue* completion_queue) {
switch (rpc->rpc_type()) {
case ::grpc::internal::RpcMethod::CLIENT_STREAMING:
RequestAsyncClientStreaming(method_index, rpc->server_context(),
rpc->responder(), completion_queue,
completion_queue,
rpc->GetState(Rpc::State::NEW_CONNECTION));
break;
default:
LOG(FATAL) << "RPC type not implemented.";
}
}
} // namespace framework } // namespace framework
} // namespace cartographer_grpc } // namespace cartographer_grpc

View File

@ -17,6 +17,7 @@
#ifndef CARTOGRAPHER_GRPC_FRAMEWORK_SERVICE_H #ifndef CARTOGRAPHER_GRPC_FRAMEWORK_SERVICE_H
#define CARTOGRAPHER_GRPC_FRAMEWORK_SERVICE_H #define CARTOGRAPHER_GRPC_FRAMEWORK_SERVICE_H
#include "cartographer_grpc/framework/rpc.h"
#include "cartographer_grpc/framework/rpc_handler.h" #include "cartographer_grpc/framework/rpc_handler.h"
#include "grpc++/impl/codegen/service_type.h" #include "grpc++/impl/codegen/service_type.h"
@ -29,10 +30,18 @@ namespace framework {
// 'Rpc' handler objects. // 'Rpc' handler objects.
class Service : public ::grpc::Service { class Service : public ::grpc::Service {
public: public:
Service(const std::map<std::string, RpcHandlerInfo>& rpc_handlers); Service(const std::string& service_name,
const std::map<std::string, RpcHandlerInfo>& rpc_handlers);
void StartServing(
const std::vector<::grpc::ServerCompletionQueue*>& completion_queues);
private: private:
std::map<std::string, RpcHandlerInfo> rpc_handlers_; void RequestNextMethodInvocation(
int method_index, Rpc* rpc,
::grpc::ServerCompletionQueue* completion_queue);
std::map<std::string, RpcHandlerInfo> rpc_handler_infos_;
ActiveRpcs active_rpcs_;
}; };
} // namespace framework } // namespace framework

View File

@ -0,0 +1,69 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CARTOGRAPHER_GRPC_FRAMEWORK_TYPES_H
#define CARTOGRAPHER_GRPC_FRAMEWORK_TYPES_H
#include <grpc++/grpc++.h>
namespace cartographer_grpc {
namespace framework {
template <typename Request>
class Stream {
using type = Request;
};
template <template <typename> class, typename T>
struct Strip {
using type = T;
};
template <template <typename> class T, typename Param>
struct Strip<T, T<Param>> {
using type = Param;
};
template <typename T>
using StripStream = typename Strip<Stream, T>::type;
template <typename Incoming, typename Outgoing>
struct RpcType
: public std::integral_constant<grpc::internal::RpcMethod::RpcType,
grpc::internal::RpcMethod::NORMAL_RPC> {};
template <typename Incoming, typename Outgoing>
struct RpcType<Stream<Incoming>, Outgoing>
: public std::integral_constant<
grpc::internal::RpcMethod::RpcType,
grpc::internal::RpcMethod::CLIENT_STREAMING> {};
template <typename Incoming, typename Outgoing>
struct RpcType<Incoming, Stream<Outgoing>>
: public std::integral_constant<
grpc::internal::RpcMethod::RpcType,
grpc::internal::RpcMethod::SERVER_STREAMING> {};
template <typename Incoming, typename Outgoing>
struct RpcType<Stream<Incoming>, Stream<Outgoing>>
: public std::integral_constant<grpc::internal::RpcMethod::RpcType,
grpc::internal::RpcMethod::BIDI_STREAMING> {
};
} // namespace framework
} // namespace cartographer_grpc
#endif // CARTOGRAPHER_GRPC_FRAMEWORK_TYPES_H

View File

@ -0,0 +1,43 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cartographer_grpc/framework/type_traits.h"
#include "gtest/gtest.h"
namespace cartographer_grpc {
namespace framework {
namespace {
TEST(TypeTraitsTest, StreamStripping) {
::testing::StaticAssertTypeEq<StripStream<Stream<int>>, int>();
::testing::StaticAssertTypeEq<StripStream<int>, int>();
}
TEST(TypeTraitsTest, RpcTypes) {
EXPECT_EQ((RpcType<int, int>::value),
::grpc::internal::RpcMethod::NORMAL_RPC);
EXPECT_EQ((RpcType<Stream<int>, int>::value),
::grpc::internal::RpcMethod::CLIENT_STREAMING);
EXPECT_EQ((RpcType<int, Stream<int>>::value),
::grpc::internal::RpcMethod::SERVER_STREAMING);
EXPECT_EQ((RpcType<Stream<int>, Stream<int>>::value),
::grpc::internal::RpcMethod::BIDI_STREAMING);
}
} // namespace
} // namespace framework
} // namespace cartographer_grpc

View File

@ -17,10 +17,8 @@
set -o errexit set -o errexit
set -o verbose set -o verbose
VERSION="v1.7.2"
# Build and install gRPC. # Build and install gRPC.
git clone -b ${VERSION} https://github.com/grpc/grpc git clone https://github.com/grpc/grpc
cd grpc cd grpc
git submodule update --init git submodule update --init
make make