From c8f33ee853636b0b4da3fbfe719b35a923a3617f Mon Sep 17 00:00:00 2001 From: Damon Kohler Date: Thu, 17 Nov 2016 09:11:00 +0100 Subject: [PATCH] Drop sensor data until progress can be made. (#124) The sensor collator now finds a common starting time for all sensor data. For example, this prevents us from processing arbitrarily many IMU packets before our first scan match and makes it possible to auto pause and resume a trajectory in the event of sensor data interruption. --- cartographer/sensor/collator_test.cc | 38 ++++++++++++------- cartographer/sensor/ordered_multi_queue.h | 31 +++++++++++---- .../sensor/ordered_multi_queue_test.cc | 22 ++++++----- 3 files changed, 61 insertions(+), 30 deletions(-) diff --git a/cartographer/sensor/collator_test.cc b/cartographer/sensor/collator_test.cc index f7b7cce..4b6c18e 100644 --- a/cartographer/sensor/collator_test.cc +++ b/cartographer/sensor/collator_test.cc @@ -31,6 +31,7 @@ namespace { TEST(Collator, Ordering) { const std::array kSensorId = { {"horizontal_laser", "vertical_laser", "imu", "odometry"}}; + Data zero(common::FromUniversal(0), sensor::LaserFan{}); Data first(common::FromUniversal(100), sensor::LaserFan{}); Data second(common::FromUniversal(200), sensor::LaserFan{}); Data third(common::FromUniversal(300), Data::Imu{}); @@ -47,6 +48,17 @@ TEST(Collator, Ordering) { }); constexpr int kTrajectoryId = 0; + + // Establish a common start time. + collator.AddSensorData(kTrajectoryId, kSensorId[0], + common::make_unique(zero)); + collator.AddSensorData(kTrajectoryId, kSensorId[1], + common::make_unique(zero)); + collator.AddSensorData(kTrajectoryId, kSensorId[2], + common::make_unique(zero)); + collator.AddSensorData(kTrajectoryId, kSensorId[3], + common::make_unique(zero)); + collator.AddSensorData(kTrajectoryId, kSensorId[0], common::make_unique(first)); collator.AddSensorData(kTrajectoryId, kSensorId[3], @@ -60,22 +72,22 @@ TEST(Collator, Ordering) { collator.AddSensorData(kTrajectoryId, kSensorId[2], common::make_unique(third)); - ASSERT_EQ(3, received.size()); - EXPECT_EQ(100, common::ToUniversal(received[0].second.time)); - EXPECT_EQ(kSensorId[0], received[0].first); - EXPECT_EQ(200, common::ToUniversal(received[1].second.time)); - EXPECT_EQ(kSensorId[1], received[1].first); - EXPECT_EQ(300, common::ToUniversal(received[2].second.time)); - EXPECT_EQ(kSensorId[2], received[2].first); + ASSERT_EQ(7, received.size()); + EXPECT_EQ(100, common::ToUniversal(received[4].second.time)); + EXPECT_EQ(kSensorId[0], received[4].first); + EXPECT_EQ(200, common::ToUniversal(received[5].second.time)); + EXPECT_EQ(kSensorId[1], received[5].first); + EXPECT_EQ(300, common::ToUniversal(received[6].second.time)); + EXPECT_EQ(kSensorId[2], received[6].first); collator.Flush(); - ASSERT_EQ(6, received.size()); - EXPECT_EQ(kSensorId[0], received[3].first); - EXPECT_EQ(500, common::ToUniversal(received[4].second.time)); - EXPECT_EQ(kSensorId[1], received[4].first); - EXPECT_EQ(600, common::ToUniversal(received[5].second.time)); - EXPECT_EQ(kSensorId[3], received[5].first); + ASSERT_EQ(10, received.size()); + EXPECT_EQ(kSensorId[0], received[7].first); + EXPECT_EQ(500, common::ToUniversal(received[8].second.time)); + EXPECT_EQ(kSensorId[1], received[8].first); + EXPECT_EQ(600, common::ToUniversal(received[9].second.time)); + EXPECT_EQ(kSensorId[3], received[9].first); } } // namespace diff --git a/cartographer/sensor/ordered_multi_queue.h b/cartographer/sensor/ordered_multi_queue.h index 2a7cc29..30d7092 100644 --- a/cartographer/sensor/ordered_multi_queue.h +++ b/cartographer/sensor/ordered_multi_queue.h @@ -32,7 +32,8 @@ namespace cartographer { namespace sensor { -// Number of items that can be queued up before we LOG(WARNING). +// Number of items that can be queued up before we log which queues are waiting +// for data. const int kMaxQueueSize = 500; struct QueueKey { @@ -78,10 +79,6 @@ class OrderedMultiQueue { Dispatch(); } - bool HasQueue(const QueueKey& queue_key) { - return queues_.count(queue_key) != 0; - } - void Add(const QueueKey& queue_key, std::unique_ptr data) { auto* queue = FindOrNull(queue_key); if (queue == nullptr) { @@ -151,7 +148,7 @@ class OrderedMultiQueue { next_data = data; next_queue = &it->second; } - CHECK_LE(last_dispatched_key_, next_data->time) + CHECK_LE(last_dispatched_time_, next_data->time) << "Non-sorted data added to queue: '" << it->first << "'"; ++it; } @@ -159,7 +156,24 @@ class OrderedMultiQueue { CHECK(queues_.empty()); return; } - last_dispatched_key_ = next_data->time; + + // If we haven't dispatched any data yet, fast forward all queues until a + // common start time has been reached. + if (common_start_time_ == common::Time::min()) { + for (auto& entry : queues_) { + common_start_time_ = + std::max(common_start_time_, entry.second.queue.Peek()->time); + } + LOG(INFO) << "All sensor data is available starting at '" + << common_start_time_ << "'."; + } + + if (next_data->time < common_start_time_) { + next_queue->queue.Pop(); + continue; + } + + last_dispatched_time_ = next_data->time; next_queue->callback(next_queue->queue.Pop()); } } @@ -186,7 +200,8 @@ class OrderedMultiQueue { } // Used to verify that values are dispatched in sorted order. - common::Time last_dispatched_key_ = common::Time::min(); + common::Time last_dispatched_time_ = common::Time::min(); + common::Time common_start_time_ = common::Time::min(); std::map queues_; }; diff --git a/cartographer/sensor/ordered_multi_queue_test.cc b/cartographer/sensor/ordered_multi_queue_test.cc index 85bd50b..874d2a6 100644 --- a/cartographer/sensor/ordered_multi_queue_test.cc +++ b/cartographer/sensor/ordered_multi_queue_test.cc @@ -27,15 +27,16 @@ namespace { class OrderedMultiQueueTest : public ::testing::Test { protected: - const QueueKey kFirst{1, "foo"}; - const QueueKey kSecond{1, "bar"}; - const QueueKey kThird{2, "bar"}; + // These are keys are chosen so that they sort first, second, third. + const QueueKey kFirst{1, "a"}; + const QueueKey kSecond{1, "b"}; + const QueueKey kThird{2, "b"}; void SetUp() { for (const auto& queue_key : {kFirst, kSecond, kThird}) { queue_.AddQueue(queue_key, [this](std::unique_ptr data) { if (!values_.empty()) { - EXPECT_GT(data->time, values_.back().time); + EXPECT_GE(data->time, values_.back().time); } values_.push_back(*data); }); @@ -53,23 +54,26 @@ class OrderedMultiQueueTest : public ::testing::Test { }; TEST_F(OrderedMultiQueueTest, Ordering) { + queue_.Add(kFirst, MakeImu(0)); queue_.Add(kFirst, MakeImu(4)); queue_.Add(kFirst, MakeImu(5)); queue_.Add(kFirst, MakeImu(6)); EXPECT_TRUE(values_.empty()); + queue_.Add(kSecond, MakeImu(0)); queue_.Add(kSecond, MakeImu(1)); EXPECT_TRUE(values_.empty()); + queue_.Add(kThird, MakeImu(0)); queue_.Add(kThird, MakeImu(2)); - EXPECT_EQ(values_.size(), 1); + EXPECT_EQ(values_.size(), 4); queue_.Add(kSecond, MakeImu(3)); - EXPECT_EQ(values_.size(), 2); + EXPECT_EQ(values_.size(), 5); queue_.Add(kSecond, MakeImu(7)); queue_.Add(kThird, MakeImu(8)); queue_.Flush(); - EXPECT_EQ(8, values_.size()); - for (size_t i = 0; i < values_.size(); ++i) { - EXPECT_EQ(i + 1, common::ToUniversal(values_[i].time)); + EXPECT_EQ(11, values_.size()); + for (size_t i = 0; i < values_.size() - 1; ++i) { + EXPECT_LE(values_[i].time, values_[i + 1].time); } }