Drop sensor data until progress can be made. (#124)
The sensor collator now finds a common starting time for all sensor data. For example, this prevents us from processing arbitrarily many IMU packets before our first scan match and makes it possible to auto pause and resume a trajectory in the event of sensor data interruption.master
parent
6777654202
commit
c8f33ee853
|
@ -31,6 +31,7 @@ namespace {
|
||||||
TEST(Collator, Ordering) {
|
TEST(Collator, Ordering) {
|
||||||
const std::array<string, 4> kSensorId = {
|
const std::array<string, 4> kSensorId = {
|
||||||
{"horizontal_laser", "vertical_laser", "imu", "odometry"}};
|
{"horizontal_laser", "vertical_laser", "imu", "odometry"}};
|
||||||
|
Data zero(common::FromUniversal(0), sensor::LaserFan{});
|
||||||
Data first(common::FromUniversal(100), sensor::LaserFan{});
|
Data first(common::FromUniversal(100), sensor::LaserFan{});
|
||||||
Data second(common::FromUniversal(200), sensor::LaserFan{});
|
Data second(common::FromUniversal(200), sensor::LaserFan{});
|
||||||
Data third(common::FromUniversal(300), Data::Imu{});
|
Data third(common::FromUniversal(300), Data::Imu{});
|
||||||
|
@ -47,6 +48,17 @@ TEST(Collator, Ordering) {
|
||||||
});
|
});
|
||||||
|
|
||||||
constexpr int kTrajectoryId = 0;
|
constexpr int kTrajectoryId = 0;
|
||||||
|
|
||||||
|
// Establish a common start time.
|
||||||
|
collator.AddSensorData(kTrajectoryId, kSensorId[0],
|
||||||
|
common::make_unique<Data>(zero));
|
||||||
|
collator.AddSensorData(kTrajectoryId, kSensorId[1],
|
||||||
|
common::make_unique<Data>(zero));
|
||||||
|
collator.AddSensorData(kTrajectoryId, kSensorId[2],
|
||||||
|
common::make_unique<Data>(zero));
|
||||||
|
collator.AddSensorData(kTrajectoryId, kSensorId[3],
|
||||||
|
common::make_unique<Data>(zero));
|
||||||
|
|
||||||
collator.AddSensorData(kTrajectoryId, kSensorId[0],
|
collator.AddSensorData(kTrajectoryId, kSensorId[0],
|
||||||
common::make_unique<Data>(first));
|
common::make_unique<Data>(first));
|
||||||
collator.AddSensorData(kTrajectoryId, kSensorId[3],
|
collator.AddSensorData(kTrajectoryId, kSensorId[3],
|
||||||
|
@ -60,22 +72,22 @@ TEST(Collator, Ordering) {
|
||||||
collator.AddSensorData(kTrajectoryId, kSensorId[2],
|
collator.AddSensorData(kTrajectoryId, kSensorId[2],
|
||||||
common::make_unique<Data>(third));
|
common::make_unique<Data>(third));
|
||||||
|
|
||||||
ASSERT_EQ(3, received.size());
|
ASSERT_EQ(7, received.size());
|
||||||
EXPECT_EQ(100, common::ToUniversal(received[0].second.time));
|
EXPECT_EQ(100, common::ToUniversal(received[4].second.time));
|
||||||
EXPECT_EQ(kSensorId[0], received[0].first);
|
EXPECT_EQ(kSensorId[0], received[4].first);
|
||||||
EXPECT_EQ(200, common::ToUniversal(received[1].second.time));
|
EXPECT_EQ(200, common::ToUniversal(received[5].second.time));
|
||||||
EXPECT_EQ(kSensorId[1], received[1].first);
|
EXPECT_EQ(kSensorId[1], received[5].first);
|
||||||
EXPECT_EQ(300, common::ToUniversal(received[2].second.time));
|
EXPECT_EQ(300, common::ToUniversal(received[6].second.time));
|
||||||
EXPECT_EQ(kSensorId[2], received[2].first);
|
EXPECT_EQ(kSensorId[2], received[6].first);
|
||||||
|
|
||||||
collator.Flush();
|
collator.Flush();
|
||||||
|
|
||||||
ASSERT_EQ(6, received.size());
|
ASSERT_EQ(10, received.size());
|
||||||
EXPECT_EQ(kSensorId[0], received[3].first);
|
EXPECT_EQ(kSensorId[0], received[7].first);
|
||||||
EXPECT_EQ(500, common::ToUniversal(received[4].second.time));
|
EXPECT_EQ(500, common::ToUniversal(received[8].second.time));
|
||||||
EXPECT_EQ(kSensorId[1], received[4].first);
|
EXPECT_EQ(kSensorId[1], received[8].first);
|
||||||
EXPECT_EQ(600, common::ToUniversal(received[5].second.time));
|
EXPECT_EQ(600, common::ToUniversal(received[9].second.time));
|
||||||
EXPECT_EQ(kSensorId[3], received[5].first);
|
EXPECT_EQ(kSensorId[3], received[9].first);
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
|
@ -32,7 +32,8 @@
|
||||||
namespace cartographer {
|
namespace cartographer {
|
||||||
namespace sensor {
|
namespace sensor {
|
||||||
|
|
||||||
// Number of items that can be queued up before we LOG(WARNING).
|
// Number of items that can be queued up before we log which queues are waiting
|
||||||
|
// for data.
|
||||||
const int kMaxQueueSize = 500;
|
const int kMaxQueueSize = 500;
|
||||||
|
|
||||||
struct QueueKey {
|
struct QueueKey {
|
||||||
|
@ -78,10 +79,6 @@ class OrderedMultiQueue {
|
||||||
Dispatch();
|
Dispatch();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool HasQueue(const QueueKey& queue_key) {
|
|
||||||
return queues_.count(queue_key) != 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void Add(const QueueKey& queue_key, std::unique_ptr<Data> data) {
|
void Add(const QueueKey& queue_key, std::unique_ptr<Data> data) {
|
||||||
auto* queue = FindOrNull(queue_key);
|
auto* queue = FindOrNull(queue_key);
|
||||||
if (queue == nullptr) {
|
if (queue == nullptr) {
|
||||||
|
@ -151,7 +148,7 @@ class OrderedMultiQueue {
|
||||||
next_data = data;
|
next_data = data;
|
||||||
next_queue = &it->second;
|
next_queue = &it->second;
|
||||||
}
|
}
|
||||||
CHECK_LE(last_dispatched_key_, next_data->time)
|
CHECK_LE(last_dispatched_time_, next_data->time)
|
||||||
<< "Non-sorted data added to queue: '" << it->first << "'";
|
<< "Non-sorted data added to queue: '" << it->first << "'";
|
||||||
++it;
|
++it;
|
||||||
}
|
}
|
||||||
|
@ -159,7 +156,24 @@ class OrderedMultiQueue {
|
||||||
CHECK(queues_.empty());
|
CHECK(queues_.empty());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
last_dispatched_key_ = next_data->time;
|
|
||||||
|
// If we haven't dispatched any data yet, fast forward all queues until a
|
||||||
|
// common start time has been reached.
|
||||||
|
if (common_start_time_ == common::Time::min()) {
|
||||||
|
for (auto& entry : queues_) {
|
||||||
|
common_start_time_ =
|
||||||
|
std::max(common_start_time_, entry.second.queue.Peek<Data>()->time);
|
||||||
|
}
|
||||||
|
LOG(INFO) << "All sensor data is available starting at '"
|
||||||
|
<< common_start_time_ << "'.";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (next_data->time < common_start_time_) {
|
||||||
|
next_queue->queue.Pop();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
last_dispatched_time_ = next_data->time;
|
||||||
next_queue->callback(next_queue->queue.Pop());
|
next_queue->callback(next_queue->queue.Pop());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -186,7 +200,8 @@ class OrderedMultiQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Used to verify that values are dispatched in sorted order.
|
// Used to verify that values are dispatched in sorted order.
|
||||||
common::Time last_dispatched_key_ = common::Time::min();
|
common::Time last_dispatched_time_ = common::Time::min();
|
||||||
|
common::Time common_start_time_ = common::Time::min();
|
||||||
|
|
||||||
std::map<QueueKey, Queue> queues_;
|
std::map<QueueKey, Queue> queues_;
|
||||||
};
|
};
|
||||||
|
|
|
@ -27,15 +27,16 @@ namespace {
|
||||||
|
|
||||||
class OrderedMultiQueueTest : public ::testing::Test {
|
class OrderedMultiQueueTest : public ::testing::Test {
|
||||||
protected:
|
protected:
|
||||||
const QueueKey kFirst{1, "foo"};
|
// These are keys are chosen so that they sort first, second, third.
|
||||||
const QueueKey kSecond{1, "bar"};
|
const QueueKey kFirst{1, "a"};
|
||||||
const QueueKey kThird{2, "bar"};
|
const QueueKey kSecond{1, "b"};
|
||||||
|
const QueueKey kThird{2, "b"};
|
||||||
|
|
||||||
void SetUp() {
|
void SetUp() {
|
||||||
for (const auto& queue_key : {kFirst, kSecond, kThird}) {
|
for (const auto& queue_key : {kFirst, kSecond, kThird}) {
|
||||||
queue_.AddQueue(queue_key, [this](std::unique_ptr<Data> data) {
|
queue_.AddQueue(queue_key, [this](std::unique_ptr<Data> data) {
|
||||||
if (!values_.empty()) {
|
if (!values_.empty()) {
|
||||||
EXPECT_GT(data->time, values_.back().time);
|
EXPECT_GE(data->time, values_.back().time);
|
||||||
}
|
}
|
||||||
values_.push_back(*data);
|
values_.push_back(*data);
|
||||||
});
|
});
|
||||||
|
@ -53,23 +54,26 @@ class OrderedMultiQueueTest : public ::testing::Test {
|
||||||
};
|
};
|
||||||
|
|
||||||
TEST_F(OrderedMultiQueueTest, Ordering) {
|
TEST_F(OrderedMultiQueueTest, Ordering) {
|
||||||
|
queue_.Add(kFirst, MakeImu(0));
|
||||||
queue_.Add(kFirst, MakeImu(4));
|
queue_.Add(kFirst, MakeImu(4));
|
||||||
queue_.Add(kFirst, MakeImu(5));
|
queue_.Add(kFirst, MakeImu(5));
|
||||||
queue_.Add(kFirst, MakeImu(6));
|
queue_.Add(kFirst, MakeImu(6));
|
||||||
EXPECT_TRUE(values_.empty());
|
EXPECT_TRUE(values_.empty());
|
||||||
|
queue_.Add(kSecond, MakeImu(0));
|
||||||
queue_.Add(kSecond, MakeImu(1));
|
queue_.Add(kSecond, MakeImu(1));
|
||||||
EXPECT_TRUE(values_.empty());
|
EXPECT_TRUE(values_.empty());
|
||||||
|
queue_.Add(kThird, MakeImu(0));
|
||||||
queue_.Add(kThird, MakeImu(2));
|
queue_.Add(kThird, MakeImu(2));
|
||||||
EXPECT_EQ(values_.size(), 1);
|
EXPECT_EQ(values_.size(), 4);
|
||||||
queue_.Add(kSecond, MakeImu(3));
|
queue_.Add(kSecond, MakeImu(3));
|
||||||
EXPECT_EQ(values_.size(), 2);
|
EXPECT_EQ(values_.size(), 5);
|
||||||
queue_.Add(kSecond, MakeImu(7));
|
queue_.Add(kSecond, MakeImu(7));
|
||||||
queue_.Add(kThird, MakeImu(8));
|
queue_.Add(kThird, MakeImu(8));
|
||||||
queue_.Flush();
|
queue_.Flush();
|
||||||
|
|
||||||
EXPECT_EQ(8, values_.size());
|
EXPECT_EQ(11, values_.size());
|
||||||
for (size_t i = 0; i < values_.size(); ++i) {
|
for (size_t i = 0; i < values_.size() - 1; ++i) {
|
||||||
EXPECT_EQ(i + 1, common::ToUniversal(values_[i].time));
|
EXPECT_LE(values_[i].time, values_[i + 1].time);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue