Deflake destruction and test grpc SerializeState (#1291)

This PR also removes various checks in ThreadPool which ensures that the ThreadPool task queue runs to completion before the destructor returns.
master
Christoph Schütte 2018-07-18 10:18:08 +02:00 committed by Wally B. Feed
parent 05a8314a23
commit 0668411d6f
2 changed files with 23 additions and 4 deletions

View File

@ -20,6 +20,9 @@
#include "cartographer/cloud/client/map_builder_stub.h" #include "cartographer/cloud/client/map_builder_stub.h"
#include "cartographer/cloud/internal/map_builder_server.h" #include "cartographer/cloud/internal/map_builder_server.h"
#include "cartographer/cloud/map_builder_server_options.h" #include "cartographer/cloud/map_builder_server_options.h"
#include "cartographer/io/internal/in_memory_proto_stream.h"
#include "cartographer/io/proto_stream.h"
#include "cartographer/io/proto_stream_deserializer.h"
#include "cartographer/io/testing/test_helpers.h" #include "cartographer/io/testing/test_helpers.h"
#include "cartographer/mapping/internal/testing/mock_map_builder.h" #include "cartographer/mapping/internal/testing/mock_map_builder.h"
#include "cartographer/mapping/internal/testing/mock_pose_graph.h" #include "cartographer/mapping/internal/testing/mock_pose_graph.h"
@ -449,6 +452,26 @@ TEST_F(ClientServerTest, LocalSlam2DWithUploadingServer) {
} }
WaitForLocalSlamResults(measurements.size()); WaitForLocalSlamResults(measurements.size());
WaitForLocalSlamResultUploads(number_of_insertion_results_); WaitForLocalSlamResultUploads(number_of_insertion_results_);
std::queue<std::unique_ptr<google::protobuf::Message>> chunks;
io::ForwardingProtoStreamWriter writer(
[&chunks](const google::protobuf::Message* proto) -> bool {
if (!proto) {
return true;
}
std::unique_ptr<google::protobuf::Message> p(proto->New());
p->CopyFrom(*proto);
chunks.push(std::move(p));
return true;
});
stub_->SerializeState(&writer);
CHECK(writer.Close());
// Ensure it can be read.
io::InMemoryProtoStreamReader reader(std::move(chunks));
io::ProtoStreamDeserializer deserializer(&reader);
EXPECT_EQ(deserializer.pose_graph().trajectory_size(), 1);
stub_for_uploading_server_->FinishTrajectory(trajectory_id); stub_for_uploading_server_->FinishTrajectory(trajectory_id);
EXPECT_EQ(local_slam_result_poses_.size(), measurements.size()); EXPECT_EQ(local_slam_result_poses_.size(), measurements.size());
EXPECT_NEAR(kTravelDistance, EXPECT_NEAR(kTravelDistance,

View File

@ -46,8 +46,6 @@ ThreadPool::~ThreadPool() {
MutexLocker locker(&mutex_); MutexLocker locker(&mutex_);
CHECK(running_); CHECK(running_);
running_ = false; running_ = false;
CHECK_EQ(task_queue_.size(), 0);
CHECK_EQ(tasks_not_ready_.size(), 0);
} }
for (std::thread& thread : pool_) { for (std::thread& thread : pool_) {
thread.join(); thread.join();
@ -56,7 +54,6 @@ ThreadPool::~ThreadPool() {
void ThreadPool::NotifyDependenciesCompleted(Task* task) { void ThreadPool::NotifyDependenciesCompleted(Task* task) {
MutexLocker locker(&mutex_); MutexLocker locker(&mutex_);
CHECK(running_);
auto it = tasks_not_ready_.find(task); auto it = tasks_not_ready_.find(task);
CHECK(it != tasks_not_ready_.end()); CHECK(it != tasks_not_ready_.end());
task_queue_.push_back(it->second); task_queue_.push_back(it->second);
@ -67,7 +64,6 @@ std::weak_ptr<Task> ThreadPool::Schedule(std::unique_ptr<Task> task) {
std::shared_ptr<Task> shared_task; std::shared_ptr<Task> shared_task;
{ {
MutexLocker locker(&mutex_); MutexLocker locker(&mutex_);
CHECK(running_);
auto insert_result = auto insert_result =
tasks_not_ready_.insert(std::make_pair(task.get(), std::move(task))); tasks_not_ready_.insert(std::make_pair(task.get(), std::move(task)));
CHECK(insert_result.second) << "Schedule called twice"; CHECK(insert_result.second) << "Schedule called twice";