Start DataUploader implementation, implement Add/FinishTrajectory ()

[RFC=0002](https://github.com/googlecartographer/rfcs/blob/master/text/0002-cloud-based-mapping-1.md)
master
Christoph Schütte 2018-01-12 23:16:29 +01:00 committed by Wally B. Feed
parent 057bd8ce6a
commit f49e798ef9
10 changed files with 102 additions and 14 deletions

View File

@ -99,12 +99,9 @@ class ClientServerTest : public ::testing::Test {
include "map_builder_server.lua"
MAP_BUILDER.use_trajectory_builder_2d = true
MAP_BUILDER.pose_graph.optimize_every_n_nodes = 0
MAP_BUILDER_SERVER = {
server_address = "0.0.0.0:50051",
num_event_threads = 1,
num_grpc_threads = 1,
map_builder = MAP_BUILDER,
}
MAP_BUILDER_SERVER.num_event_threads = 1
MAP_BUILDER_SERVER.num_grpc_threads = 1
MAP_BUILDER_SERVER.uplink_server_address = ""
return MAP_BUILDER_SERVER)text";
auto map_builder_server_parameters =
cartographer::mapping::test::ResolveLuaParameters(kMapBuilderServerLua);

View File

@ -16,8 +16,46 @@
#include "cartographer_grpc/data_uploader.h"
#include "glog/logging.h"
namespace cartographer_grpc {
DataUploader::DataUploader(const std::string &data_uploader) {}
DataUploader::DataUploader(const std::string& server_address)
: client_channel_(grpc::CreateChannel(server_address,
grpc::InsecureChannelCredentials())),
service_stub_(proto::MapBuilderService::NewStub(client_channel_)) {}
} // namespace cartographer_grpc
void DataUploader::AddTrajectory(
int local_trajectory_id,
const std::unordered_set<std::string>& expected_sensor_ids,
const cartographer::mapping::proto::TrajectoryBuilderOptions&
trajectory_options) {
grpc::ClientContext client_context;
proto::AddTrajectoryRequest request;
proto::AddTrajectoryResponse result;
*request.mutable_trajectory_builder_options() = trajectory_options;
for (const auto& sensor_id : expected_sensor_ids) {
*request.add_expected_sensor_ids() = sensor_id;
}
grpc::Status status =
service_stub_->AddTrajectory(&client_context, request, &result);
CHECK(status.ok());
CHECK_EQ(local_to_cloud_trajectory_id_map_.count(local_trajectory_id), 0);
local_to_cloud_trajectory_id_map_[local_trajectory_id] =
result.trajectory_id();
}
void DataUploader::FinishTrajectory(int local_trajectory_id) {
CHECK_EQ(local_to_cloud_trajectory_id_map_.count(local_trajectory_id), 1);
int cloud_trajectory_id =
local_to_cloud_trajectory_id_map_[local_trajectory_id];
grpc::ClientContext client_context;
proto::FinishTrajectoryRequest request;
google::protobuf::Empty response;
request.set_trajectory_id(cloud_trajectory_id);
grpc::Status status =
service_stub_->FinishTrajectory(&client_context, request, &response);
CHECK(status.ok());
}
} // namespace cartographer_grpc

View File

@ -17,13 +17,30 @@
#ifndef CARTOGRAPHER_GRPC_DATA_UPLOADER_H
#define CARTOGRAPHER_GRPC_DATA_UPLOADER_H
#include <map>
#include <string>
#include <unordered_set>
#include "cartographer/mapping/proto/trajectory_builder_options.pb.h"
#include "cartographer_grpc/proto/map_builder_service.grpc.pb.h"
#include "grpc++/grpc++.h"
namespace cartographer_grpc {
class DataUploader {
public:
DataUploader(const std::string &server_address);
DataUploader(const std::string& server_address);
void AddTrajectory(
int local_trajectory_id,
const std::unordered_set<std::string>& expected_sensor_ids,
const cartographer::mapping::proto::TrajectoryBuilderOptions&
trajectory_options);
void FinishTrajectory(int local_trajectory_id);
private:
std::shared_ptr<grpc::Channel> client_channel_;
std::unique_ptr<proto::MapBuilderService::Stub> service_stub_;
std::map<int, int> local_to_cloud_trajectory_id_map_;
};
} // namespace cartographer_grpc

View File

@ -33,14 +33,31 @@ class AddTrajectoryHandler
auto local_slam_result_callback =
GetUnsynchronizedContext<MapBuilderServer::MapBuilderContext>()
->GetLocalSlamResultCallbackForSubscriptions();
std::unordered_set<std::string> expected_sensor_ids(
request.expected_sensor_ids().begin(),
request.expected_sensor_ids().end());
const int trajectory_id =
GetContext<MapBuilderServer::MapBuilderContext>()
->map_builder()
.AddTrajectoryBuilder(std::unordered_set<std::string>(
request.expected_sensor_ids().begin(),
request.expected_sensor_ids().end()),
.AddTrajectoryBuilder(expected_sensor_ids,
request.trajectory_builder_options(),
local_slam_result_callback);
if (GetUnsynchronizedContext<MapBuilderServer::MapBuilderContext>()
->data_uploader()) {
auto trajectory_builder_options = request.trajectory_builder_options();
// Clear the trajectory builder options to convey to the cloud
// Cartographer instance that does not need to instantiate a
// 'LocalTrajectoryBuilder'.
trajectory_builder_options.clear_trajectory_builder_2d_options();
trajectory_builder_options.clear_trajectory_builder_3d_options();
GetContext<MapBuilderServer::MapBuilderContext>()
->data_uploader()
->AddTrajectory(trajectory_id, expected_sensor_ids,
trajectory_builder_options);
}
auto response =
cartographer::common::make_unique<proto::AddTrajectoryResponse>();
response->set_trajectory_id(trajectory_id);

View File

@ -36,6 +36,12 @@ class FinishTrajectoryHandler
.FinishTrajectory(request.trajectory_id());
GetUnsynchronizedContext<MapBuilderServer::MapBuilderContext>()
->NotifyFinishTrajectory(request.trajectory_id());
if (GetUnsynchronizedContext<MapBuilderServer::MapBuilderContext>()
->data_uploader()) {
GetContext<MapBuilderServer::MapBuilderContext>()
->data_uploader()
->FinishTrajectory(request.trajectory_id());
}
Send(cartographer::common::make_unique<google::protobuf::Empty>());
}
};

View File

@ -207,6 +207,10 @@ MapBuilderServer::MapBuilderServer(
map_builder_server_options.num_grpc_threads());
server_builder.SetNumEventThreads(
map_builder_server_options.num_event_threads());
if (!map_builder_server_options.uplink_server_address().empty()) {
data_uploader_ = cartographer::common::make_unique<DataUploader>(
map_builder_server_options.uplink_server_address());
}
server_builder.RegisterHandler<handlers::AddTrajectoryHandler,
proto::MapBuilderService>("AddTrajectory");
server_builder.RegisterHandler<handlers::AddOdometryDataHandler,

View File

@ -23,6 +23,7 @@
#include "cartographer/mapping/map_builder.h"
#include "cartographer/mapping/trajectory_builder_interface.h"
#include "cartographer/sensor/dispatchable.h"
#include "cartographer_grpc/data_uploader.h"
#include "cartographer_grpc/framework/execution_context.h"
#include "cartographer_grpc/framework/server.h"
#include "cartographer_grpc/proto/map_builder_server_options.pb.h"
@ -69,6 +70,9 @@ class MapBuilderServer {
ProcessLocalSlamResultData(
const std::string& sensor_id, cartographer::common::Time time,
const cartographer::mapping::proto::LocalSlamResultData& proto);
DataUploader* data_uploader() {
return map_builder_server_->data_uploader_.get();
}
template <typename DataType>
void EnqueueSensorData(int trajectory_id, const std::string& sensor_id,
@ -150,6 +154,7 @@ class MapBuilderServer {
int current_subscription_index_ = 0;
std::map<int /* trajectory ID */, LocalSlamResultHandlerSubscriptions>
local_slam_subscriptions_ GUARDED_BY(local_slam_subscriptions_lock_);
std::unique_ptr<DataUploader> data_uploader_;
};
} // namespace cartographer_grpc

View File

@ -34,6 +34,8 @@ proto::MapBuilderServerOptions CreateMapBuilderServerOptions(
*map_builder_server_options.mutable_map_builder_options() =
cartographer::mapping::CreateMapBuilderOptions(
lua_parameter_dictionary->GetDictionary("map_builder").get());
map_builder_server_options.set_uplink_server_address(
lua_parameter_dictionary->GetString("uplink_server_address"));
return map_builder_server_options;
}

View File

@ -23,4 +23,5 @@ message MapBuilderServerOptions {
int32 num_grpc_threads = 2;
int32 num_event_threads = 3;
cartographer.mapping.proto.MapBuilderOptions map_builder_options = 4;
string uplink_server_address = 5;
}

View File

@ -15,8 +15,9 @@
include "map_builder.lua"
MAP_BUILDER_SERVER = {
server_address = "0.0.0.0:50051",
map_builder = MAP_BUILDER,
num_event_threads = 4,
num_grpc_threads = 4,
map_builder = MAP_BUILDER,
server_address = "0.0.0.0:50051",
uplink_server_address = "localhost:50052",
}