diff --git a/cartographer_grpc/framework/event_queue_thread.cc b/cartographer_grpc/framework/event_queue_thread.cc new file mode 100644 index 0000000..f4ac275 --- /dev/null +++ b/cartographer_grpc/framework/event_queue_thread.cc @@ -0,0 +1,44 @@ +/* + * Copyright 2017 The Cartographer Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "cartographer_grpc/framework/event_queue_thread.h" + +#include "cartographer/common/make_unique.h" +#include "glog/logging.h" + +namespace cartographer_grpc { +namespace framework { + +EventQueueThread::EventQueueThread() { + event_queue_ = cartographer::common::make_unique(); +} + +EventQueue* EventQueueThread::event_queue() { return event_queue_.get(); } + +void EventQueueThread::Start(EventQueueRunner runner) { + CHECK(!thread_); + EventQueue* event_queue = event_queue_.get(); + thread_ = cartographer::common::make_unique( + [event_queue, runner]() { runner(event_queue); }); +} + +void EventQueueThread::Shutdown() { + LOG(INFO) << "Shutting down event queue " << event_queue_.get(); + thread_->join(); +} + +} // namespace framework +} // namespace cartographer_grpc diff --git a/cartographer_grpc/framework/event_queue_thread.h b/cartographer_grpc/framework/event_queue_thread.h new file mode 100644 index 0000000..e7d2189 --- /dev/null +++ b/cartographer_grpc/framework/event_queue_thread.h @@ -0,0 +1,50 @@ +/* + * Copyright 2017 The Cartographer Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef CARTOGRAPHER_GRPC_FRAMEWORK_EVENT_QUEUE_THREAD_H +#define CARTOGRAPHER_GRPC_FRAMEWORK_EVENT_QUEUE_THREAD_H + +#include +#include + +#include "cartographer/common/blocking_queue.h" +#include "cartographer_grpc/framework/rpc.h" + +namespace cartographer_grpc { +namespace framework { + +using EventQueue = cartographer::common::BlockingQueue; + +class EventQueueThread { + public: + using EventQueueRunner = std::function; + + EventQueueThread(); + + EventQueue* event_queue(); + + void Start(EventQueueRunner runner); + void Shutdown(); + + private: + std::unique_ptr event_queue_; + std::unique_ptr thread_; +}; + +} // namespace framework +} // namespace cartographer_grpc + +#endif // CARTOGRAPHER_GRPC_FRAMEWORK_EVENT_QUEUE_THREAD_H diff --git a/cartographer_grpc/framework/rpc.cc b/cartographer_grpc/framework/rpc.cc index e52d609..5dd5690 100644 --- a/cartographer_grpc/framework/rpc.cc +++ b/cartographer_grpc/framework/rpc.cc @@ -42,11 +42,12 @@ void SendUnaryFinish(ReaderWriter* reader_writer, ::grpc::Status status, Rpc::Rpc(int method_index, ::grpc::ServerCompletionQueue* server_completion_queue, - ExecutionContext* execution_context, + EventQueue* event_queue, ExecutionContext* execution_context, const RpcHandlerInfo& rpc_handler_info, Service* service, WeakPtrFactory weak_ptr_factory) : method_index_(method_index), server_completion_queue_(server_completion_queue), + event_queue_(event_queue), execution_context_(execution_context), rpc_handler_info_(rpc_handler_info), service_(service), @@ -65,7 +66,7 @@ Rpc::Rpc(int method_index, std::unique_ptr Rpc::Clone() { return cartographer::common::make_unique( - method_index_, server_completion_queue_, execution_context_, + method_index_, server_completion_queue_, event_queue_, execution_context_, rpc_handler_info_, service_, weak_ptr_factory_); } diff --git a/cartographer_grpc/framework/rpc.h b/cartographer_grpc/framework/rpc.h index 094015e..0c2ddb4 100644 --- a/cartographer_grpc/framework/rpc.h +++ b/cartographer_grpc/framework/rpc.h @@ -21,6 +21,7 @@ #include #include +#include "cartographer/common/blocking_queue.h" #include "cartographer/common/mutex.h" #include "cartographer_grpc/framework/execution_context.h" #include "cartographer_grpc/framework/rpc_handler_interface.h" @@ -37,15 +38,17 @@ namespace framework { class Service; class Rpc { public: + struct RpcEvent; + using EventQueue = cartographer::common::BlockingQueue; using WeakPtrFactory = std::function(Rpc*)>; enum class Event { NEW_CONNECTION = 0, READ, WRITE, FINISH, DONE }; struct RpcEvent { const Event event; std::weak_ptr rpc; + bool ok; }; - Rpc(int method_index, ::grpc::ServerCompletionQueue* server_completion_queue, - ExecutionContext* execution_context, + EventQueue* event_queue, ExecutionContext* execution_context, const RpcHandlerInfo& rpc_handler_info, Service* service, WeakPtrFactory weak_ptr_factory); std::unique_ptr Clone(); @@ -60,6 +63,8 @@ class Rpc { void SetRpcEventState(Event event, bool pending); bool IsRpcEventPending(Event event); bool IsAnyEventPending(); + void SetEventQueue(EventQueue* event_queue) { event_queue_ = event_queue; } + EventQueue* event_queue() { return event_queue_; } private: struct SendItem { @@ -84,6 +89,7 @@ class Rpc { int method_index_; ::grpc::ServerCompletionQueue* server_completion_queue_; + EventQueue* event_queue_; ExecutionContext* execution_context_; RpcHandlerInfo rpc_handler_info_; Service* service_; diff --git a/cartographer_grpc/framework/server.cc b/cartographer_grpc/framework/server.cc index 6a3e2fa..adb1042 100644 --- a/cartographer_grpc/framework/server.cc +++ b/cartographer_grpc/framework/server.cc @@ -20,9 +20,19 @@ namespace cartographer_grpc { namespace framework { +namespace { -void Server::Builder::SetNumberOfThreads(const size_t number_of_threads) { - options_.number_of_threads = number_of_threads; +const cartographer::common::Duration kPopEventTimeout = + cartographer::common::FromMilliseconds(100); + +} // namespace + +void Server::Builder::SetNumGrpcThreads(const size_t num_grpc_threads) { + options_.num_grpc_threads = num_grpc_threads; +} + +void Server::Builder::SetNumEventThreads(const std::size_t num_event_threads) { + options_.num_event_threads = num_event_threads; } void Server::Builder::SetServerAddress(const std::string& server_address) { @@ -41,8 +51,12 @@ Server::Server(const Options& options) : options_(options) { server_builder_.AddListeningPort(options_.server_address, grpc::InsecureServerCredentials()); + // Set up event queue threads. + event_queue_threads_ = + std::vector(options_.num_event_threads); + // Set up completion queues threads. - for (size_t i = 0; i < options_.number_of_threads; ++i) { + for (size_t i = 0; i < options_.num_grpc_threads; ++i) { completion_queue_threads_.emplace_back( server_builder_.AddCompletionQueue()); } @@ -52,9 +66,10 @@ void Server::AddService( const std::string& service_name, const std::map& rpc_handler_infos) { // Instantiate and register service. - const auto result = - services_.emplace(std::piecewise_construct, std::make_tuple(service_name), - std::make_tuple(service_name, rpc_handler_infos)); + const auto result = services_.emplace( + std::piecewise_construct, std::make_tuple(service_name), + std::make_tuple(service_name, rpc_handler_infos, + [this]() { return SelectNextEventQueueRoundRobin(); })); CHECK(result.second) << "A service named " << service_name << " already exists."; server_builder_.RegisterService(&result.first->second); @@ -66,12 +81,43 @@ void Server::RunCompletionQueue( void* tag; while (completion_queue->Next(&tag, &ok)) { auto* rpc_event = static_cast(tag); + rpc_event->ok = ok; if (auto rpc = rpc_event->rpc.lock()) { - rpc->service()->HandleEvent(rpc_event->event, rpc.get(), ok); + rpc->event_queue()->Push(rpc_event); } else { LOG(WARNING) << "Ignoring stale event."; } - delete rpc_event; + } +} + +void Server::ProcessRpcEvent(Rpc::RpcEvent* rpc_event) { + if (auto rpc = rpc_event->rpc.lock()) { + rpc->service()->HandleEvent(rpc_event->event, rpc.get(), rpc_event->ok); + } else { + LOG(WARNING) << "Ignoring stale event."; + } + delete rpc_event; +} + +EventQueue* Server::SelectNextEventQueueRoundRobin() { + cartographer::common::MutexLocker locker(¤t_event_queue_id_lock_); + current_event_queue_id_ = + (current_event_queue_id_ + 1) % options_.num_event_threads; + return event_queue_threads_.at(current_event_queue_id_).event_queue(); +} + +void Server::RunEventQueue(EventQueue* event_queue) { + while (!shutting_down_) { + Rpc::RpcEvent* rpc_event = event_queue->PopWithTimeout(kPopEventTimeout); + if (rpc_event) { + ProcessRpcEvent(rpc_event); + } + } + + // Finish processing the rest of the items. + while (Rpc::RpcEvent* rpc_event = + event_queue->PopWithTimeout(kPopEventTimeout)) { + ProcessRpcEvent(rpc_event); } } @@ -85,9 +131,18 @@ void Server::Start() { execution_context_.get()); } + // Start threads to process all event queues. + for (auto& event_queue_thread : event_queue_threads_) { + event_queue_thread.Start( + [this](EventQueue* event_queue) { RunEventQueue(event_queue); }); + } + // Start threads to process all completion queues. for (auto& completion_queue_threads : completion_queue_threads_) { - completion_queue_threads.Start(Server::RunCompletionQueue); + completion_queue_threads.Start( + [this](::grpc::ServerCompletionQueue* completion_queue) { + RunCompletionQueue(completion_queue); + }); } } @@ -101,6 +156,7 @@ void Server::WaitForShutdown() { void Server::Shutdown() { LOG(INFO) << "Shutting down server."; + shutting_down_ = true; // Tell the services to stop serving RPCs. for (auto& service : services_) { @@ -117,6 +173,10 @@ void Server::Shutdown() { completion_queue_threads.Shutdown(); } + for (auto& event_queue_thread : event_queue_threads_) { + event_queue_thread.Shutdown(); + } + LOG(INFO) << "Shutdown complete."; } diff --git a/cartographer_grpc/framework/server.h b/cartographer_grpc/framework/server.h index e76bd09..92fe51e 100644 --- a/cartographer_grpc/framework/server.h +++ b/cartographer_grpc/framework/server.h @@ -25,6 +25,7 @@ #include "cartographer/common/make_unique.h" #include "cartographer_grpc/framework/completion_queue_thread.h" +#include "cartographer_grpc/framework/event_queue_thread.h" #include "cartographer_grpc/framework/execution_context.h" #include "cartographer_grpc/framework/rpc_handler.h" #include "cartographer_grpc/framework/service.h" @@ -38,7 +39,8 @@ class Server { // All options that configure server behaviour such as number of threads, // ports etc. struct Options { - size_t number_of_threads = 4; + size_t num_grpc_threads; + size_t num_event_threads; std::string server_address = "0.0.0.0:50051"; }; @@ -49,7 +51,8 @@ class Server { Builder() = default; std::unique_ptr Build(); - void SetNumberOfThreads(std::size_t number_of_threads); + void SetNumGrpcThreads(std::size_t num_grpc_threads); + void SetNumEventThreads(std::size_t num_event_threads); void SetServerAddress(const std::string& server_address); template @@ -105,16 +108,18 @@ class Server { Server(const Options& options); Server(const Server&) = delete; Server& operator=(const Server&) = delete; - void AddService( const std::string& service_name, const std::map& rpc_handler_infos); - - static void RunCompletionQueue( - ::grpc::ServerCompletionQueue* completion_queue); + void RunCompletionQueue(::grpc::ServerCompletionQueue* completion_queue); + void RunEventQueue(EventQueue* event_queue); + void ProcessRpcEvent(Rpc::RpcEvent* rpc_event); + EventQueue* SelectNextEventQueueRoundRobin(); Options options_; + bool shutting_down_ = false; + // gRPC objects needed to build a server. ::grpc::ServerBuilder server_builder_; std::unique_ptr<::grpc::Server> server_; @@ -122,6 +127,11 @@ class Server { // Threads processing the completion queues. std::vector completion_queue_threads_; + // Threads processing RPC events. + std::vector event_queue_threads_; + cartographer::common::Mutex current_event_queue_id_lock_; + int current_event_queue_id_ = 0; + // Map of service names to services. std::map services_; diff --git a/cartographer_grpc/framework/server_test.cc b/cartographer_grpc/framework/server_test.cc index 3cb4f31..c6873d6 100644 --- a/cartographer_grpc/framework/server_test.cc +++ b/cartographer_grpc/framework/server_test.cc @@ -93,7 +93,8 @@ class ServerTest : public ::testing::Test { void SetUp() override { Server::Builder server_builder; server_builder.SetServerAddress(kServerAddress); - server_builder.SetNumberOfThreads(kNumThreads); + server_builder.SetNumGrpcThreads(kNumThreads); + server_builder.SetNumEventThreads(kNumThreads); server_builder.RegisterHandler("GetSum"); server_builder.RegisterHandler("GetSquare"); server_builder.RegisterHandler( diff --git a/cartographer_grpc/framework/service.cc b/cartographer_grpc/framework/service.cc index c501054..f40ecb9 100644 --- a/cartographer_grpc/framework/service.cc +++ b/cartographer_grpc/framework/service.cc @@ -16,6 +16,8 @@ #include "cartographer_grpc/framework/server.h" +#include + #include "glog/logging.h" #include "grpc++/impl/codegen/proto_utils.h" @@ -23,8 +25,10 @@ namespace cartographer_grpc { namespace framework { Service::Service(const std::string& service_name, - const std::map& rpc_handler_infos) - : rpc_handler_infos_(rpc_handler_infos) { + const std::map& rpc_handler_infos, + EventQueueSelector event_queue_selector) + : rpc_handler_infos_(rpc_handler_infos), + event_queue_selector_(event_queue_selector) { for (const auto& rpc_handler_info : rpc_handler_infos_) { // The 'handler' below is set to 'nullptr' indicating that we want to // handle this method asynchronously. @@ -42,7 +46,8 @@ void Service::StartServing( for (auto& completion_queue_thread : completion_queue_threads) { std::shared_ptr rpc = active_rpcs_.Add(cartographer::common::make_unique( - i, completion_queue_thread.completion_queue(), execution_context, + i, completion_queue_thread.completion_queue(), + event_queue_selector_(), execution_context, rpc_handler_info.second, this, active_rpcs_.GetWeakPtrFactory())); rpc->RequestNextMethodInvocation(); } @@ -93,8 +98,10 @@ void Service::HandleNewConnection(Rpc* rpc, bool ok) { } // Create new active rpc to handle next connection and register it for the - // incoming connection. - active_rpcs_.Add(rpc->Clone())->RequestNextMethodInvocation(); + // incoming connection. Assign event queue in a round-robin fashion. + std::unique_ptr new_rpc = rpc->Clone(); + new_rpc->SetEventQueue(event_queue_selector_()); + active_rpcs_.Add(std::move(new_rpc))->RequestNextMethodInvocation(); } void Service::HandleRead(Rpc* rpc, bool ok) { diff --git a/cartographer_grpc/framework/service.h b/cartographer_grpc/framework/service.h index b3f60a2..895b0e7 100644 --- a/cartographer_grpc/framework/service.h +++ b/cartographer_grpc/framework/service.h @@ -18,6 +18,7 @@ #define CARTOGRAPHER_GRPC_FRAMEWORK_SERVICE_H #include "cartographer_grpc/framework/completion_queue_thread.h" +#include "cartographer_grpc/framework/event_queue_thread.h" #include "cartographer_grpc/framework/execution_context.h" #include "cartographer_grpc/framework/rpc.h" #include "cartographer_grpc/framework/rpc_handler.h" @@ -32,10 +33,12 @@ namespace framework { // 'Rpc' handler objects. class Service : public ::grpc::Service { public: + using EventQueueSelector = std::function; friend class Rpc; Service(const std::string& service_name, - const std::map& rpc_handlers); + const std::map& rpc_handlers, + EventQueueSelector event_queue_selector); void StartServing(std::vector& completion_queues, ExecutionContext* execution_context); void HandleEvent(Rpc::Event event, Rpc* rpc, bool ok); @@ -51,6 +54,7 @@ class Service : public ::grpc::Service { void RemoveIfNotPending(Rpc* rpc); std::map rpc_handler_infos_; + EventQueueSelector event_queue_selector_; ActiveRpcs active_rpcs_; bool shutting_down_ = false; }; diff --git a/cartographer_grpc/map_builder_server.cc b/cartographer_grpc/map_builder_server.cc index a65740c..d4d1789 100644 --- a/cartographer_grpc/map_builder_server.cc +++ b/cartographer_grpc/map_builder_server.cc @@ -52,8 +52,10 @@ MapBuilderServer::MapBuilderServer( : map_builder_(map_builder_server_options.map_builder_options()) { framework::Server::Builder server_builder; server_builder.SetServerAddress(map_builder_server_options.server_address()); - server_builder.SetNumberOfThreads( + server_builder.SetNumGrpcThreads( map_builder_server_options.num_grpc_threads()); + server_builder.SetNumEventThreads( + map_builder_server_options.num_event_threads()); server_builder.RegisterHandler("AddTrajectory"); server_builder.RegisterHandlerGetString("server_address")); map_builder_server_options.set_num_grpc_threads( lua_parameter_dictionary->GetInt("num_grpc_threads")); + map_builder_server_options.set_num_event_threads( + lua_parameter_dictionary->GetInt("num_event_threads")); *map_builder_server_options.mutable_map_builder_options() = cartographer::mapping::CreateMapBuilderOptions( lua_parameter_dictionary->GetDictionary("map_builder").get()); diff --git a/cartographer_grpc/proto/map_builder_server_options.proto b/cartographer_grpc/proto/map_builder_server_options.proto index 72748b4..9b82b32 100644 --- a/cartographer_grpc/proto/map_builder_server_options.proto +++ b/cartographer_grpc/proto/map_builder_server_options.proto @@ -21,5 +21,6 @@ package cartographer_grpc.proto; message MapBuilderServerOptions { string server_address = 1; int32 num_grpc_threads = 2; - cartographer.mapping.proto.MapBuilderOptions map_builder_options = 3; + int32 num_event_threads = 3; + cartographer.mapping.proto.MapBuilderOptions map_builder_options = 4; }