2017-11-23 22:37:30 +08:00
|
|
|
/*
|
|
|
|
* Copyright 2017 The Cartographer Authors
|
|
|
|
*
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
#include "cartographer_grpc/framework/server.h"
|
|
|
|
|
2017-11-25 06:41:58 +08:00
|
|
|
#include "glog/logging.h"
|
|
|
|
#include "grpc++/impl/codegen/proto_utils.h"
|
|
|
|
|
2017-11-23 22:37:30 +08:00
|
|
|
namespace cartographer_grpc {
|
|
|
|
namespace framework {
|
|
|
|
|
2017-11-25 06:41:58 +08:00
|
|
|
Service::Service(const std::string& service_name,
|
|
|
|
const std::map<std::string, RpcHandlerInfo>& rpc_handler_infos)
|
|
|
|
: rpc_handler_infos_(rpc_handler_infos) {
|
|
|
|
for (const auto& rpc_handler_info : rpc_handler_infos_) {
|
|
|
|
// The 'handler' below is set to 'nullptr' indicating that we want to
|
|
|
|
// handle this method asynchronously.
|
|
|
|
this->AddMethod(new grpc::internal::RpcServiceMethod(
|
2017-11-30 20:18:16 +08:00
|
|
|
rpc_handler_info.second.fully_qualified_name.c_str(),
|
|
|
|
rpc_handler_info.second.rpc_type, nullptr /* handler */));
|
2017-11-25 06:41:58 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void Service::StartServing(
|
2017-11-29 21:05:31 +08:00
|
|
|
std::vector<CompletionQueueThread>& completion_queue_threads,
|
|
|
|
ExecutionContext* execution_context) {
|
2017-11-25 06:41:58 +08:00
|
|
|
int i = 0;
|
|
|
|
for (const auto& rpc_handler_info : rpc_handler_infos_) {
|
2017-11-28 17:50:30 +08:00
|
|
|
for (auto& completion_queue_thread : completion_queue_threads) {
|
|
|
|
Rpc* rpc = active_rpcs_.Add(cartographer::common::make_unique<Rpc>(
|
2017-11-29 21:05:31 +08:00
|
|
|
i, completion_queue_thread.completion_queue(), execution_context,
|
2017-11-28 17:50:30 +08:00
|
|
|
rpc_handler_info.second, this));
|
2017-11-29 17:40:26 +08:00
|
|
|
rpc->RequestNextMethodInvocation();
|
2017-11-25 06:41:58 +08:00
|
|
|
}
|
|
|
|
++i;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-11-28 17:50:30 +08:00
|
|
|
void Service::StopServing() { shutting_down_ = true; }
|
|
|
|
|
2017-11-29 17:40:26 +08:00
|
|
|
void Service::HandleEvent(Rpc::Event event, Rpc* rpc, bool ok) {
|
|
|
|
rpc->GetRpcEvent(event)->pending = false;
|
|
|
|
switch (event) {
|
|
|
|
case Rpc::Event::NEW_CONNECTION:
|
2017-11-28 17:50:30 +08:00
|
|
|
HandleNewConnection(rpc, ok);
|
|
|
|
break;
|
2017-11-29 17:40:26 +08:00
|
|
|
case Rpc::Event::READ:
|
|
|
|
HandleRead(rpc, ok);
|
2017-11-28 17:50:30 +08:00
|
|
|
break;
|
2017-11-29 17:40:26 +08:00
|
|
|
case Rpc::Event::WRITE:
|
|
|
|
HandleWrite(rpc, ok);
|
2017-11-28 17:50:30 +08:00
|
|
|
break;
|
2017-11-30 20:18:16 +08:00
|
|
|
case Rpc::Event::FINISH:
|
|
|
|
HandleFinish(rpc, ok);
|
|
|
|
break;
|
2017-11-29 17:40:26 +08:00
|
|
|
case Rpc::Event::DONE:
|
2017-11-28 17:50:30 +08:00
|
|
|
HandleDone(rpc, ok);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void Service::HandleNewConnection(Rpc* rpc, bool ok) {
|
|
|
|
if (shutting_down_) {
|
2017-11-29 17:40:26 +08:00
|
|
|
if (ok) {
|
|
|
|
LOG(WARNING) << "Server shutting down. Refusing to handle new RPCs.";
|
|
|
|
}
|
2017-11-28 17:50:30 +08:00
|
|
|
active_rpcs_.Remove(rpc);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!ok) {
|
|
|
|
LOG(ERROR) << "Failed to establish connection for unknown reason.";
|
|
|
|
active_rpcs_.Remove(rpc);
|
|
|
|
}
|
|
|
|
|
2017-11-29 17:40:26 +08:00
|
|
|
if (ok) {
|
|
|
|
// For request-streaming RPCs ask the client to start sending requests.
|
|
|
|
rpc->RequestStreamingReadIfNeeded();
|
|
|
|
}
|
2017-11-28 17:50:30 +08:00
|
|
|
|
2017-11-29 17:40:26 +08:00
|
|
|
// Create new active rpc to handle next connection and register it for the
|
|
|
|
// incoming connection.
|
|
|
|
active_rpcs_.Add(rpc->Clone())->RequestNextMethodInvocation();
|
|
|
|
}
|
2017-11-28 17:50:30 +08:00
|
|
|
|
2017-11-29 17:40:26 +08:00
|
|
|
void Service::HandleRead(Rpc* rpc, bool ok) {
|
|
|
|
if (ok) {
|
|
|
|
rpc->OnRequest();
|
|
|
|
rpc->RequestStreamingReadIfNeeded();
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Reads completed.
|
|
|
|
rpc->OnReadsDone();
|
2017-12-04 22:28:19 +08:00
|
|
|
|
|
|
|
RemoveIfNotPending(rpc);
|
2017-11-28 17:50:30 +08:00
|
|
|
}
|
|
|
|
|
2017-11-29 17:40:26 +08:00
|
|
|
void Service::HandleWrite(Rpc* rpc, bool ok) {
|
|
|
|
if (!ok) {
|
|
|
|
LOG(ERROR) << "Write failed";
|
|
|
|
}
|
|
|
|
|
2017-12-04 22:28:19 +08:00
|
|
|
// Send the next message or potentially finish the connection.
|
|
|
|
rpc->PerformWriteIfNeeded();
|
|
|
|
|
2017-11-29 17:40:26 +08:00
|
|
|
RemoveIfNotPending(rpc);
|
|
|
|
}
|
|
|
|
|
2017-11-30 20:18:16 +08:00
|
|
|
void Service::HandleFinish(Rpc* rpc, bool ok) {
|
|
|
|
if (!ok) {
|
|
|
|
LOG(ERROR) << "Finish failed";
|
|
|
|
}
|
|
|
|
|
|
|
|
RemoveIfNotPending(rpc);
|
|
|
|
}
|
|
|
|
|
2017-11-29 17:40:26 +08:00
|
|
|
void Service::HandleDone(Rpc* rpc, bool ok) { RemoveIfNotPending(rpc); }
|
|
|
|
|
|
|
|
void Service::RemoveIfNotPending(Rpc* rpc) {
|
|
|
|
if (!rpc->GetRpcEvent(Rpc::Event::DONE)->pending &&
|
|
|
|
!rpc->GetRpcEvent(Rpc::Event::READ)->pending &&
|
2017-11-30 20:18:16 +08:00
|
|
|
!rpc->GetRpcEvent(Rpc::Event::WRITE)->pending &&
|
|
|
|
!rpc->GetRpcEvent(Rpc::Event::FINISH)->pending) {
|
2017-11-29 17:40:26 +08:00
|
|
|
active_rpcs_.Remove(rpc);
|
2017-11-25 06:41:58 +08:00
|
|
|
}
|
|
|
|
}
|
2017-11-23 22:37:30 +08:00
|
|
|
|
|
|
|
} // namespace framework
|
|
|
|
} // namespace cartographer_grpc
|