Implement EventQueues. (#759)

master
Christoph Schütte 2017-12-15 12:21:44 +01:00 committed by Wally B. Feed
parent e023ec5ecc
commit 69f74a11ba
12 changed files with 216 additions and 28 deletions

View File

@ -0,0 +1,44 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cartographer_grpc/framework/event_queue_thread.h"
#include "cartographer/common/make_unique.h"
#include "glog/logging.h"
namespace cartographer_grpc {
namespace framework {
EventQueueThread::EventQueueThread() {
event_queue_ = cartographer::common::make_unique<EventQueue>();
}
EventQueue* EventQueueThread::event_queue() { return event_queue_.get(); }
void EventQueueThread::Start(EventQueueRunner runner) {
CHECK(!thread_);
EventQueue* event_queue = event_queue_.get();
thread_ = cartographer::common::make_unique<std::thread>(
[event_queue, runner]() { runner(event_queue); });
}
void EventQueueThread::Shutdown() {
LOG(INFO) << "Shutting down event queue " << event_queue_.get();
thread_->join();
}
} // namespace framework
} // namespace cartographer_grpc

View File

@ -0,0 +1,50 @@
/*
* Copyright 2017 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CARTOGRAPHER_GRPC_FRAMEWORK_EVENT_QUEUE_THREAD_H
#define CARTOGRAPHER_GRPC_FRAMEWORK_EVENT_QUEUE_THREAD_H
#include <memory>
#include <thread>
#include "cartographer/common/blocking_queue.h"
#include "cartographer_grpc/framework/rpc.h"
namespace cartographer_grpc {
namespace framework {
using EventQueue = cartographer::common::BlockingQueue<Rpc::RpcEvent*>;
class EventQueueThread {
public:
using EventQueueRunner = std::function<void(EventQueue*)>;
EventQueueThread();
EventQueue* event_queue();
void Start(EventQueueRunner runner);
void Shutdown();
private:
std::unique_ptr<EventQueue> event_queue_;
std::unique_ptr<std::thread> thread_;
};
} // namespace framework
} // namespace cartographer_grpc
#endif // CARTOGRAPHER_GRPC_FRAMEWORK_EVENT_QUEUE_THREAD_H

View File

@ -42,11 +42,12 @@ void SendUnaryFinish(ReaderWriter* reader_writer, ::grpc::Status status,
Rpc::Rpc(int method_index, Rpc::Rpc(int method_index,
::grpc::ServerCompletionQueue* server_completion_queue, ::grpc::ServerCompletionQueue* server_completion_queue,
ExecutionContext* execution_context, EventQueue* event_queue, ExecutionContext* execution_context,
const RpcHandlerInfo& rpc_handler_info, Service* service, const RpcHandlerInfo& rpc_handler_info, Service* service,
WeakPtrFactory weak_ptr_factory) WeakPtrFactory weak_ptr_factory)
: method_index_(method_index), : method_index_(method_index),
server_completion_queue_(server_completion_queue), server_completion_queue_(server_completion_queue),
event_queue_(event_queue),
execution_context_(execution_context), execution_context_(execution_context),
rpc_handler_info_(rpc_handler_info), rpc_handler_info_(rpc_handler_info),
service_(service), service_(service),
@ -65,7 +66,7 @@ Rpc::Rpc(int method_index,
std::unique_ptr<Rpc> Rpc::Clone() { std::unique_ptr<Rpc> Rpc::Clone() {
return cartographer::common::make_unique<Rpc>( return cartographer::common::make_unique<Rpc>(
method_index_, server_completion_queue_, execution_context_, method_index_, server_completion_queue_, event_queue_, execution_context_,
rpc_handler_info_, service_, weak_ptr_factory_); rpc_handler_info_, service_, weak_ptr_factory_);
} }

View File

@ -21,6 +21,7 @@
#include <queue> #include <queue>
#include <unordered_set> #include <unordered_set>
#include "cartographer/common/blocking_queue.h"
#include "cartographer/common/mutex.h" #include "cartographer/common/mutex.h"
#include "cartographer_grpc/framework/execution_context.h" #include "cartographer_grpc/framework/execution_context.h"
#include "cartographer_grpc/framework/rpc_handler_interface.h" #include "cartographer_grpc/framework/rpc_handler_interface.h"
@ -37,15 +38,17 @@ namespace framework {
class Service; class Service;
class Rpc { class Rpc {
public: public:
struct RpcEvent;
using EventQueue = cartographer::common::BlockingQueue<RpcEvent*>;
using WeakPtrFactory = std::function<std::weak_ptr<Rpc>(Rpc*)>; using WeakPtrFactory = std::function<std::weak_ptr<Rpc>(Rpc*)>;
enum class Event { NEW_CONNECTION = 0, READ, WRITE, FINISH, DONE }; enum class Event { NEW_CONNECTION = 0, READ, WRITE, FINISH, DONE };
struct RpcEvent { struct RpcEvent {
const Event event; const Event event;
std::weak_ptr<Rpc> rpc; std::weak_ptr<Rpc> rpc;
bool ok;
}; };
Rpc(int method_index, ::grpc::ServerCompletionQueue* server_completion_queue, Rpc(int method_index, ::grpc::ServerCompletionQueue* server_completion_queue,
ExecutionContext* execution_context, EventQueue* event_queue, ExecutionContext* execution_context,
const RpcHandlerInfo& rpc_handler_info, Service* service, const RpcHandlerInfo& rpc_handler_info, Service* service,
WeakPtrFactory weak_ptr_factory); WeakPtrFactory weak_ptr_factory);
std::unique_ptr<Rpc> Clone(); std::unique_ptr<Rpc> Clone();
@ -60,6 +63,8 @@ class Rpc {
void SetRpcEventState(Event event, bool pending); void SetRpcEventState(Event event, bool pending);
bool IsRpcEventPending(Event event); bool IsRpcEventPending(Event event);
bool IsAnyEventPending(); bool IsAnyEventPending();
void SetEventQueue(EventQueue* event_queue) { event_queue_ = event_queue; }
EventQueue* event_queue() { return event_queue_; }
private: private:
struct SendItem { struct SendItem {
@ -84,6 +89,7 @@ class Rpc {
int method_index_; int method_index_;
::grpc::ServerCompletionQueue* server_completion_queue_; ::grpc::ServerCompletionQueue* server_completion_queue_;
EventQueue* event_queue_;
ExecutionContext* execution_context_; ExecutionContext* execution_context_;
RpcHandlerInfo rpc_handler_info_; RpcHandlerInfo rpc_handler_info_;
Service* service_; Service* service_;

View File

@ -20,9 +20,19 @@
namespace cartographer_grpc { namespace cartographer_grpc {
namespace framework { namespace framework {
namespace {
void Server::Builder::SetNumberOfThreads(const size_t number_of_threads) { const cartographer::common::Duration kPopEventTimeout =
options_.number_of_threads = number_of_threads; cartographer::common::FromMilliseconds(100);
} // namespace
void Server::Builder::SetNumGrpcThreads(const size_t num_grpc_threads) {
options_.num_grpc_threads = num_grpc_threads;
}
void Server::Builder::SetNumEventThreads(const std::size_t num_event_threads) {
options_.num_event_threads = num_event_threads;
} }
void Server::Builder::SetServerAddress(const std::string& server_address) { void Server::Builder::SetServerAddress(const std::string& server_address) {
@ -41,8 +51,12 @@ Server::Server(const Options& options) : options_(options) {
server_builder_.AddListeningPort(options_.server_address, server_builder_.AddListeningPort(options_.server_address,
grpc::InsecureServerCredentials()); grpc::InsecureServerCredentials());
// Set up event queue threads.
event_queue_threads_ =
std::vector<EventQueueThread>(options_.num_event_threads);
// Set up completion queues threads. // Set up completion queues threads.
for (size_t i = 0; i < options_.number_of_threads; ++i) { for (size_t i = 0; i < options_.num_grpc_threads; ++i) {
completion_queue_threads_.emplace_back( completion_queue_threads_.emplace_back(
server_builder_.AddCompletionQueue()); server_builder_.AddCompletionQueue());
} }
@ -52,9 +66,10 @@ void Server::AddService(
const std::string& service_name, const std::string& service_name,
const std::map<std::string, RpcHandlerInfo>& rpc_handler_infos) { const std::map<std::string, RpcHandlerInfo>& rpc_handler_infos) {
// Instantiate and register service. // Instantiate and register service.
const auto result = const auto result = services_.emplace(
services_.emplace(std::piecewise_construct, std::make_tuple(service_name), std::piecewise_construct, std::make_tuple(service_name),
std::make_tuple(service_name, rpc_handler_infos)); std::make_tuple(service_name, rpc_handler_infos,
[this]() { return SelectNextEventQueueRoundRobin(); }));
CHECK(result.second) << "A service named " << service_name CHECK(result.second) << "A service named " << service_name
<< " already exists."; << " already exists.";
server_builder_.RegisterService(&result.first->second); server_builder_.RegisterService(&result.first->second);
@ -66,12 +81,43 @@ void Server::RunCompletionQueue(
void* tag; void* tag;
while (completion_queue->Next(&tag, &ok)) { while (completion_queue->Next(&tag, &ok)) {
auto* rpc_event = static_cast<Rpc::RpcEvent*>(tag); auto* rpc_event = static_cast<Rpc::RpcEvent*>(tag);
rpc_event->ok = ok;
if (auto rpc = rpc_event->rpc.lock()) { if (auto rpc = rpc_event->rpc.lock()) {
rpc->service()->HandleEvent(rpc_event->event, rpc.get(), ok); rpc->event_queue()->Push(rpc_event);
} else {
LOG(WARNING) << "Ignoring stale event.";
}
}
}
void Server::ProcessRpcEvent(Rpc::RpcEvent* rpc_event) {
if (auto rpc = rpc_event->rpc.lock()) {
rpc->service()->HandleEvent(rpc_event->event, rpc.get(), rpc_event->ok);
} else { } else {
LOG(WARNING) << "Ignoring stale event."; LOG(WARNING) << "Ignoring stale event.";
} }
delete rpc_event; delete rpc_event;
}
EventQueue* Server::SelectNextEventQueueRoundRobin() {
cartographer::common::MutexLocker locker(&current_event_queue_id_lock_);
current_event_queue_id_ =
(current_event_queue_id_ + 1) % options_.num_event_threads;
return event_queue_threads_.at(current_event_queue_id_).event_queue();
}
void Server::RunEventQueue(EventQueue* event_queue) {
while (!shutting_down_) {
Rpc::RpcEvent* rpc_event = event_queue->PopWithTimeout(kPopEventTimeout);
if (rpc_event) {
ProcessRpcEvent(rpc_event);
}
}
// Finish processing the rest of the items.
while (Rpc::RpcEvent* rpc_event =
event_queue->PopWithTimeout(kPopEventTimeout)) {
ProcessRpcEvent(rpc_event);
} }
} }
@ -85,9 +131,18 @@ void Server::Start() {
execution_context_.get()); execution_context_.get());
} }
// Start threads to process all event queues.
for (auto& event_queue_thread : event_queue_threads_) {
event_queue_thread.Start(
[this](EventQueue* event_queue) { RunEventQueue(event_queue); });
}
// Start threads to process all completion queues. // Start threads to process all completion queues.
for (auto& completion_queue_threads : completion_queue_threads_) { for (auto& completion_queue_threads : completion_queue_threads_) {
completion_queue_threads.Start(Server::RunCompletionQueue); completion_queue_threads.Start(
[this](::grpc::ServerCompletionQueue* completion_queue) {
RunCompletionQueue(completion_queue);
});
} }
} }
@ -101,6 +156,7 @@ void Server::WaitForShutdown() {
void Server::Shutdown() { void Server::Shutdown() {
LOG(INFO) << "Shutting down server."; LOG(INFO) << "Shutting down server.";
shutting_down_ = true;
// Tell the services to stop serving RPCs. // Tell the services to stop serving RPCs.
for (auto& service : services_) { for (auto& service : services_) {
@ -117,6 +173,10 @@ void Server::Shutdown() {
completion_queue_threads.Shutdown(); completion_queue_threads.Shutdown();
} }
for (auto& event_queue_thread : event_queue_threads_) {
event_queue_thread.Shutdown();
}
LOG(INFO) << "Shutdown complete."; LOG(INFO) << "Shutdown complete.";
} }

View File

@ -25,6 +25,7 @@
#include "cartographer/common/make_unique.h" #include "cartographer/common/make_unique.h"
#include "cartographer_grpc/framework/completion_queue_thread.h" #include "cartographer_grpc/framework/completion_queue_thread.h"
#include "cartographer_grpc/framework/event_queue_thread.h"
#include "cartographer_grpc/framework/execution_context.h" #include "cartographer_grpc/framework/execution_context.h"
#include "cartographer_grpc/framework/rpc_handler.h" #include "cartographer_grpc/framework/rpc_handler.h"
#include "cartographer_grpc/framework/service.h" #include "cartographer_grpc/framework/service.h"
@ -38,7 +39,8 @@ class Server {
// All options that configure server behaviour such as number of threads, // All options that configure server behaviour such as number of threads,
// ports etc. // ports etc.
struct Options { struct Options {
size_t number_of_threads = 4; size_t num_grpc_threads;
size_t num_event_threads;
std::string server_address = "0.0.0.0:50051"; std::string server_address = "0.0.0.0:50051";
}; };
@ -49,7 +51,8 @@ class Server {
Builder() = default; Builder() = default;
std::unique_ptr<Server> Build(); std::unique_ptr<Server> Build();
void SetNumberOfThreads(std::size_t number_of_threads); void SetNumGrpcThreads(std::size_t num_grpc_threads);
void SetNumEventThreads(std::size_t num_event_threads);
void SetServerAddress(const std::string& server_address); void SetServerAddress(const std::string& server_address);
template <typename RpcHandlerType, typename ServiceType> template <typename RpcHandlerType, typename ServiceType>
@ -105,16 +108,18 @@ class Server {
Server(const Options& options); Server(const Options& options);
Server(const Server&) = delete; Server(const Server&) = delete;
Server& operator=(const Server&) = delete; Server& operator=(const Server&) = delete;
void AddService( void AddService(
const std::string& service_name, const std::string& service_name,
const std::map<std::string, RpcHandlerInfo>& rpc_handler_infos); const std::map<std::string, RpcHandlerInfo>& rpc_handler_infos);
void RunCompletionQueue(::grpc::ServerCompletionQueue* completion_queue);
static void RunCompletionQueue( void RunEventQueue(EventQueue* event_queue);
::grpc::ServerCompletionQueue* completion_queue); void ProcessRpcEvent(Rpc::RpcEvent* rpc_event);
EventQueue* SelectNextEventQueueRoundRobin();
Options options_; Options options_;
bool shutting_down_ = false;
// gRPC objects needed to build a server. // gRPC objects needed to build a server.
::grpc::ServerBuilder server_builder_; ::grpc::ServerBuilder server_builder_;
std::unique_ptr<::grpc::Server> server_; std::unique_ptr<::grpc::Server> server_;
@ -122,6 +127,11 @@ class Server {
// Threads processing the completion queues. // Threads processing the completion queues.
std::vector<CompletionQueueThread> completion_queue_threads_; std::vector<CompletionQueueThread> completion_queue_threads_;
// Threads processing RPC events.
std::vector<EventQueueThread> event_queue_threads_;
cartographer::common::Mutex current_event_queue_id_lock_;
int current_event_queue_id_ = 0;
// Map of service names to services. // Map of service names to services.
std::map<std::string, Service> services_; std::map<std::string, Service> services_;

View File

@ -93,7 +93,8 @@ class ServerTest : public ::testing::Test {
void SetUp() override { void SetUp() override {
Server::Builder server_builder; Server::Builder server_builder;
server_builder.SetServerAddress(kServerAddress); server_builder.SetServerAddress(kServerAddress);
server_builder.SetNumberOfThreads(kNumThreads); server_builder.SetNumGrpcThreads(kNumThreads);
server_builder.SetNumEventThreads(kNumThreads);
server_builder.RegisterHandler<GetSumHandler, proto::Math>("GetSum"); server_builder.RegisterHandler<GetSumHandler, proto::Math>("GetSum");
server_builder.RegisterHandler<GetSquareHandler, proto::Math>("GetSquare"); server_builder.RegisterHandler<GetSquareHandler, proto::Math>("GetSquare");
server_builder.RegisterHandler<GetRunningSumHandler, proto::Math>( server_builder.RegisterHandler<GetRunningSumHandler, proto::Math>(

View File

@ -16,6 +16,8 @@
#include "cartographer_grpc/framework/server.h" #include "cartographer_grpc/framework/server.h"
#include <cstdlib>
#include "glog/logging.h" #include "glog/logging.h"
#include "grpc++/impl/codegen/proto_utils.h" #include "grpc++/impl/codegen/proto_utils.h"
@ -23,8 +25,10 @@ namespace cartographer_grpc {
namespace framework { namespace framework {
Service::Service(const std::string& service_name, Service::Service(const std::string& service_name,
const std::map<std::string, RpcHandlerInfo>& rpc_handler_infos) const std::map<std::string, RpcHandlerInfo>& rpc_handler_infos,
: rpc_handler_infos_(rpc_handler_infos) { EventQueueSelector event_queue_selector)
: rpc_handler_infos_(rpc_handler_infos),
event_queue_selector_(event_queue_selector) {
for (const auto& rpc_handler_info : rpc_handler_infos_) { for (const auto& rpc_handler_info : rpc_handler_infos_) {
// The 'handler' below is set to 'nullptr' indicating that we want to // The 'handler' below is set to 'nullptr' indicating that we want to
// handle this method asynchronously. // handle this method asynchronously.
@ -42,7 +46,8 @@ void Service::StartServing(
for (auto& completion_queue_thread : completion_queue_threads) { for (auto& completion_queue_thread : completion_queue_threads) {
std::shared_ptr<Rpc> rpc = std::shared_ptr<Rpc> rpc =
active_rpcs_.Add(cartographer::common::make_unique<Rpc>( active_rpcs_.Add(cartographer::common::make_unique<Rpc>(
i, completion_queue_thread.completion_queue(), execution_context, i, completion_queue_thread.completion_queue(),
event_queue_selector_(), execution_context,
rpc_handler_info.second, this, active_rpcs_.GetWeakPtrFactory())); rpc_handler_info.second, this, active_rpcs_.GetWeakPtrFactory()));
rpc->RequestNextMethodInvocation(); rpc->RequestNextMethodInvocation();
} }
@ -93,8 +98,10 @@ void Service::HandleNewConnection(Rpc* rpc, bool ok) {
} }
// Create new active rpc to handle next connection and register it for the // Create new active rpc to handle next connection and register it for the
// incoming connection. // incoming connection. Assign event queue in a round-robin fashion.
active_rpcs_.Add(rpc->Clone())->RequestNextMethodInvocation(); std::unique_ptr<Rpc> new_rpc = rpc->Clone();
new_rpc->SetEventQueue(event_queue_selector_());
active_rpcs_.Add(std::move(new_rpc))->RequestNextMethodInvocation();
} }
void Service::HandleRead(Rpc* rpc, bool ok) { void Service::HandleRead(Rpc* rpc, bool ok) {

View File

@ -18,6 +18,7 @@
#define CARTOGRAPHER_GRPC_FRAMEWORK_SERVICE_H #define CARTOGRAPHER_GRPC_FRAMEWORK_SERVICE_H
#include "cartographer_grpc/framework/completion_queue_thread.h" #include "cartographer_grpc/framework/completion_queue_thread.h"
#include "cartographer_grpc/framework/event_queue_thread.h"
#include "cartographer_grpc/framework/execution_context.h" #include "cartographer_grpc/framework/execution_context.h"
#include "cartographer_grpc/framework/rpc.h" #include "cartographer_grpc/framework/rpc.h"
#include "cartographer_grpc/framework/rpc_handler.h" #include "cartographer_grpc/framework/rpc_handler.h"
@ -32,10 +33,12 @@ namespace framework {
// 'Rpc' handler objects. // 'Rpc' handler objects.
class Service : public ::grpc::Service { class Service : public ::grpc::Service {
public: public:
using EventQueueSelector = std::function<EventQueue*()>;
friend class Rpc; friend class Rpc;
Service(const std::string& service_name, Service(const std::string& service_name,
const std::map<std::string, RpcHandlerInfo>& rpc_handlers); const std::map<std::string, RpcHandlerInfo>& rpc_handlers,
EventQueueSelector event_queue_selector);
void StartServing(std::vector<CompletionQueueThread>& completion_queues, void StartServing(std::vector<CompletionQueueThread>& completion_queues,
ExecutionContext* execution_context); ExecutionContext* execution_context);
void HandleEvent(Rpc::Event event, Rpc* rpc, bool ok); void HandleEvent(Rpc::Event event, Rpc* rpc, bool ok);
@ -51,6 +54,7 @@ class Service : public ::grpc::Service {
void RemoveIfNotPending(Rpc* rpc); void RemoveIfNotPending(Rpc* rpc);
std::map<std::string, RpcHandlerInfo> rpc_handler_infos_; std::map<std::string, RpcHandlerInfo> rpc_handler_infos_;
EventQueueSelector event_queue_selector_;
ActiveRpcs active_rpcs_; ActiveRpcs active_rpcs_;
bool shutting_down_ = false; bool shutting_down_ = false;
}; };

View File

@ -52,8 +52,10 @@ MapBuilderServer::MapBuilderServer(
: map_builder_(map_builder_server_options.map_builder_options()) { : map_builder_(map_builder_server_options.map_builder_options()) {
framework::Server::Builder server_builder; framework::Server::Builder server_builder;
server_builder.SetServerAddress(map_builder_server_options.server_address()); server_builder.SetServerAddress(map_builder_server_options.server_address());
server_builder.SetNumberOfThreads( server_builder.SetNumGrpcThreads(
map_builder_server_options.num_grpc_threads()); map_builder_server_options.num_grpc_threads());
server_builder.SetNumEventThreads(
map_builder_server_options.num_event_threads());
server_builder.RegisterHandler<handlers::AddTrajectoryHandler, server_builder.RegisterHandler<handlers::AddTrajectoryHandler,
proto::MapBuilderService>("AddTrajectory"); proto::MapBuilderService>("AddTrajectory");
server_builder.RegisterHandler<handlers::AddOdometryDataHandler, server_builder.RegisterHandler<handlers::AddOdometryDataHandler,

View File

@ -29,6 +29,8 @@ proto::MapBuilderServerOptions CreateMapBuilderServerOptions(
lua_parameter_dictionary->GetString("server_address")); lua_parameter_dictionary->GetString("server_address"));
map_builder_server_options.set_num_grpc_threads( map_builder_server_options.set_num_grpc_threads(
lua_parameter_dictionary->GetInt("num_grpc_threads")); lua_parameter_dictionary->GetInt("num_grpc_threads"));
map_builder_server_options.set_num_event_threads(
lua_parameter_dictionary->GetInt("num_event_threads"));
*map_builder_server_options.mutable_map_builder_options() = *map_builder_server_options.mutable_map_builder_options() =
cartographer::mapping::CreateMapBuilderOptions( cartographer::mapping::CreateMapBuilderOptions(
lua_parameter_dictionary->GetDictionary("map_builder").get()); lua_parameter_dictionary->GetDictionary("map_builder").get());

View File

@ -21,5 +21,6 @@ package cartographer_grpc.proto;
message MapBuilderServerOptions { message MapBuilderServerOptions {
string server_address = 1; string server_address = 1;
int32 num_grpc_threads = 2; int32 num_grpc_threads = 2;
cartographer.mapping.proto.MapBuilderOptions map_builder_options = 3; int32 num_event_threads = 3;
cartographer.mapping.proto.MapBuilderOptions map_builder_options = 4;
} }