Moves time into sensor::Data. (#76)

master
Damon Kohler 2016-10-18 16:07:18 +02:00 committed by GitHub
parent 534c627c28
commit fddb1e32a0
5 changed files with 84 additions and 98 deletions

View File

@ -37,7 +37,7 @@ namespace sensor {
class Collator { class Collator {
public: public:
using Callback = std::function<void(int64, std::unique_ptr<Data>)>; using Callback = std::function<void(std::unique_ptr<Data>)>;
Collator() {} Collator() {}
@ -51,9 +51,8 @@ class Collator {
const Callback callback) { const Callback callback) {
for (const auto& sensor_id : expected_sensor_ids) { for (const auto& sensor_id : expected_sensor_ids) {
const auto queue_key = QueueKey{trajectory_id, sensor_id}; const auto queue_key = QueueKey{trajectory_id, sensor_id};
queue_.AddQueue(queue_key, [callback](const common::Time time, queue_.AddQueue(queue_key, [callback](std::unique_ptr<Data> data) {
std::unique_ptr<Data> data) { callback(std::move(data));
callback(common::ToUniversal(time), std::move(data));
}); });
queue_keys_[trajectory_id].push_back(queue_key); queue_keys_[trajectory_id].push_back(queue_key);
} }
@ -69,12 +68,11 @@ class Collator {
// Adds 'data' for 'trajectory_id' to be collated. 'data' must contain valid // Adds 'data' for 'trajectory_id' to be collated. 'data' must contain valid
// sensor data. Sensor packets with matching 'sensor_id' must be added in time // sensor data. Sensor packets with matching 'sensor_id' must be added in time
// order. // order.
void AddSensorData(const int trajectory_id, const int64 timestamp, void AddSensorData(const int trajectory_id, const string& sensor_id,
const string& sensor_id, std::unique_ptr<Data> data) { std::unique_ptr<Data> data) {
sensor_packet_period_histogram_builder_.Add(trajectory_id, timestamp, sensor_packet_period_histogram_builder_.Add(
sensor_id); trajectory_id, common::ToUniversal(data->time), sensor_id);
queue_.Add(QueueKey{trajectory_id, sensor_id}, queue_.Add(QueueKey{trajectory_id, sensor_id}, std::move(data));
common::FromUniversal(timestamp), std::move(data));
} }
// Dispatches all queued sensor packets. May only be called once. // Dispatches all queued sensor packets. May only be called once.

View File

@ -29,52 +29,46 @@ namespace sensor {
namespace { namespace {
TEST(Collator, Ordering) { TEST(Collator, Ordering) {
Data first("horizontal_laser", sensor::LaserFan{}); Data first(common::FromUniversal(100), "horizontal_laser", sensor::LaserFan{});
Data second("vertical_laser", sensor::LaserFan{}); Data second(common::FromUniversal(200),"vertical_laser", sensor::LaserFan{});
Data third("imu", Data::Imu{}); Data third(common::FromUniversal(300),"imu", Data::Imu{});
Data fourth("horizontal_laser", sensor::LaserFan{}); Data fourth(common::FromUniversal(400),"horizontal_laser", sensor::LaserFan{});
Data fifth("vertical_laser", sensor::LaserFan{}); Data fifth(common::FromUniversal(500),"vertical_laser", sensor::LaserFan{});
Data sixth("odometry", Data::Odometry{}); Data sixth(common::FromUniversal(600),"odometry", Data::Odometry{});
const std::unordered_set<string> frame_ids = { const std::unordered_set<string> frame_ids = {
"horizontal_laser", "vertical_laser", "imu", "odometry"}; "horizontal_laser", "vertical_laser", "imu", "odometry"};
std::vector<std::pair<int64, Data>> received; std::vector<Data> received;
Collator collator; Collator collator;
collator.AddTrajectory( collator.AddTrajectory(
0, frame_ids, 0, frame_ids,
[&received](const int64 timestamp, std::unique_ptr<Data> packet) { [&received](std::unique_ptr<Data> data) {
received.push_back(std::make_pair(timestamp, *packet)); received.push_back(*data);
}); });
collator.AddSensorData(0, 100, first.frame_id, collator.AddSensorData(0, first.frame_id, common::make_unique<Data>(first));
common::make_unique<Data>(first)); collator.AddSensorData(0, sixth.frame_id, common::make_unique<Data>(sixth));
collator.AddSensorData(0, 600, sixth.frame_id, collator.AddSensorData(0, fourth.frame_id, common::make_unique<Data>(fourth));
common::make_unique<Data>(sixth)); collator.AddSensorData(0, second.frame_id, common::make_unique<Data>(second));
collator.AddSensorData(0, 400, fourth.frame_id, collator.AddSensorData(0, fifth.frame_id, common::make_unique<Data>(fifth));
common::make_unique<Data>(fourth)); collator.AddSensorData(0, third.frame_id, common::make_unique<Data>(third));
collator.AddSensorData(0, 200, second.frame_id,
common::make_unique<Data>(second));
collator.AddSensorData(0, 500, fifth.frame_id,
common::make_unique<Data>(fifth));
collator.AddSensorData(0, 300, third.frame_id,
common::make_unique<Data>(third));
EXPECT_EQ(3, received.size()); EXPECT_EQ(3, received.size());
EXPECT_EQ(100, received[0].first); EXPECT_EQ(100, common::ToUniversal(received[0].time));
EXPECT_EQ("horizontal_laser", received[0].second.frame_id); EXPECT_EQ("horizontal_laser", received[0].frame_id);
EXPECT_EQ(200, received[1].first); EXPECT_EQ(200, common::ToUniversal(received[1].time));
EXPECT_EQ("vertical_laser", received[1].second.frame_id); EXPECT_EQ("vertical_laser", received[1].frame_id);
EXPECT_EQ(300, received[2].first); EXPECT_EQ(300, common::ToUniversal(received[2].time));
EXPECT_EQ("imu", received[2].second.frame_id); EXPECT_EQ("imu", received[2].frame_id);
collator.Flush(); collator.Flush();
ASSERT_EQ(6, received.size()); ASSERT_EQ(6, received.size());
EXPECT_EQ("horizontal_laser", received[3].second.frame_id); EXPECT_EQ("horizontal_laser", received[3].frame_id);
EXPECT_EQ(500, received[4].first); EXPECT_EQ(500, common::ToUniversal(received[4].time));
EXPECT_EQ("vertical_laser", received[4].second.frame_id); EXPECT_EQ("vertical_laser", received[4].frame_id);
EXPECT_EQ(600, received[5].first); EXPECT_EQ(600, common::ToUniversal(received[5].time));
EXPECT_EQ("odometry", received[5].second.frame_id); EXPECT_EQ("odometry", received[5].frame_id);
} }
} // namespace } // namespace

View File

@ -19,6 +19,7 @@
#include <string> #include <string>
#include "cartographer/common/time.h"
#include "cartographer/kalman_filter/pose_tracker.h" #include "cartographer/kalman_filter/pose_tracker.h"
#include "cartographer/sensor/laser.h" #include "cartographer/sensor/laser.h"
#include "cartographer/transform/rigid_transform.h" #include "cartographer/transform/rigid_transform.h"
@ -42,17 +43,25 @@ struct Data {
Eigen::Vector3d angular_velocity; Eigen::Vector3d angular_velocity;
}; };
Data(const string& frame_id, const Imu& imu) Data(const common::Time time, const string& frame_id, const Imu& imu)
: type(Type::kImu), frame_id(frame_id), imu(imu) {} : type(Type::kImu), time(time), frame_id(frame_id), imu(imu) {}
Data(const string& frame_id, Data(const common::Time time, const string& frame_id,
const ::cartographer::sensor::LaserFan& laser_fan) const ::cartographer::sensor::LaserFan& laser_fan)
: type(Type::kLaserFan), frame_id(frame_id), laser_fan(laser_fan) {} : type(Type::kLaserFan),
time(time),
frame_id(frame_id),
laser_fan(laser_fan) {}
Data(const string& frame_id, const Odometry& odometry) Data(const common::Time time, const string& frame_id,
: type(Type::kOdometry), frame_id(frame_id), odometry(odometry) {} const Odometry& odometry)
: type(Type::kOdometry),
time(time),
frame_id(frame_id),
odometry(odometry) {}
Type type; Type type;
common::Time time;
string frame_id; string frame_id;
Imu imu; Imu imu;
sensor::LaserFan laser_fan; sensor::LaserFan laser_fan;

View File

@ -51,9 +51,9 @@ inline std::ostream& operator<<(std::ostream& out, const QueueKey& key) {
// sorted order. This class is thread-compatible. // sorted order. This class is thread-compatible.
class OrderedMultiQueue { class OrderedMultiQueue {
public: public:
using Callback = std::function<void(common::Time, std::unique_ptr<Data>)>; using Callback = std::function<void(std::unique_ptr<Data>)>;
// Will wait to see at least one value for each 'expected_queue_keys' before // Will wait to see at least one value for each unfinished queue before
// dispatching the next smallest value across all queues. // dispatching the next smallest value across all queues.
OrderedMultiQueue() {} OrderedMultiQueue() {}
~OrderedMultiQueue() {} ~OrderedMultiQueue() {}
@ -74,17 +74,14 @@ class OrderedMultiQueue {
return queues_.count(queue_key) != 0; return queues_.count(queue_key) != 0;
} }
void Add(const QueueKey& queue_key, const common::Time& sort_key, void Add(const QueueKey& queue_key, std::unique_ptr<Data> data) {
std::unique_ptr<Data> value) {
auto* queue = FindOrNull(queue_key); auto* queue = FindOrNull(queue_key);
if (queue == nullptr) { if (queue == nullptr) {
// TODO(damonkohler): This will not work for every value of "queue_key". LOG_EVERY_N(WARNING, 1000) << "Ignored data for queue: '" << queue_key
LOG_EVERY_N(WARNING, 1000) << "Ignored value for queue: '" << queue_key
<< "'"; << "'";
return; return;
} }
queue->queue.Push( queue->queue.Push(std::move(data));
common::make_unique<KeyValuePair>(sort_key, std::move(value)));
Dispatch(); Dispatch();
} }
@ -108,15 +105,8 @@ class OrderedMultiQueue {
} }
private: private:
struct KeyValuePair {
KeyValuePair(const common::Time& sort_key, std::unique_ptr<Data> value)
: sort_key(sort_key), value(std::move(value)) {}
common::Time sort_key;
std::unique_ptr<Data> value;
};
struct Queue { struct Queue {
common::BlockingQueue<std::unique_ptr<KeyValuePair>> queue; common::BlockingQueue<std::unique_ptr<Data>> queue;
Callback callback; Callback callback;
bool finished = false; bool finished = false;
}; };
@ -140,11 +130,11 @@ class OrderedMultiQueue {
void Dispatch() { void Dispatch() {
while (true) { while (true) {
Queue* next_queue = nullptr; Queue* next_queue = nullptr;
const KeyValuePair* next_key_value_pair = nullptr; const Data* next_data = nullptr;
for (auto it = queues_.begin(); it != queues_.end();) { for (auto it = queues_.begin(); it != queues_.end();) {
auto& queue = it->second.queue; auto& queue = it->second.queue;
const auto* key_value_pair = queue.template Peek<KeyValuePair>(); const auto* data = queue.Peek<Data>();
if (key_value_pair == nullptr) { if (data == nullptr) {
if (it->second.finished) { if (it->second.finished) {
queues_.erase(it++); queues_.erase(it++);
continue; continue;
@ -152,24 +142,22 @@ class OrderedMultiQueue {
CannotMakeProgress(); CannotMakeProgress();
return; return;
} }
if (next_key_value_pair == nullptr || if (next_data == nullptr ||
std::forward_as_tuple(key_value_pair->sort_key, it->first) < std::forward_as_tuple(data->time, it->first) <
std::forward_as_tuple(next_key_value_pair->sort_key, std::forward_as_tuple(next_data->time, it->first)) {
it->first)) { next_data = data;
next_key_value_pair = key_value_pair;
next_queue = &it->second; next_queue = &it->second;
} }
CHECK_LE(last_dispatched_key_, next_key_value_pair->sort_key) CHECK_LE(last_dispatched_key_, next_data->time)
<< "Non-sorted values added to queue: '" << it->first << "'"; << "Non-sorted data added to queue: '" << it->first << "'";
++it; ++it;
} }
if (next_key_value_pair == nullptr) { if (next_data == nullptr) {
CHECK(queues_.empty()); CHECK(queues_.empty());
return; return;
} }
last_dispatched_key_ = next_key_value_pair->sort_key; last_dispatched_key_ = next_data->time;
next_queue->callback(last_dispatched_key_, next_queue->callback(std::move(next_queue->queue.Pop()));
std::move(next_queue->queue.Pop()->value));
} }
} }

View File

@ -33,12 +33,9 @@ class OrderedMultiQueueTest : public ::testing::Test {
void SetUp() { void SetUp() {
for (const auto& queue_key : {kFirst, kSecond, kThird}) { for (const auto& queue_key : {kFirst, kSecond, kThird}) {
queue_.AddQueue(queue_key, [this](const common::Time time, queue_.AddQueue(queue_key, [this](std::unique_ptr<Data> data) {
std::unique_ptr<Data> data) {
EXPECT_EQ(common::ToUniversal(time), data->imu.linear_acceleration.x());
if (!values_.empty()) { if (!values_.empty()) {
EXPECT_GT(data->imu.linear_acceleration.x(), EXPECT_GT(data->time, values_.back().time);
values_.back().imu.linear_acceleration.x());
} }
values_.push_back(*data); values_.push_back(*data);
}); });
@ -47,8 +44,8 @@ class OrderedMultiQueueTest : public ::testing::Test {
std::unique_ptr<Data> MakeImu(const int ordinal) { std::unique_ptr<Data> MakeImu(const int ordinal) {
return common::make_unique<Data>( return common::make_unique<Data>(
"unused_frame_id", common::FromUniversal(ordinal), "unused_frame_id",
Data::Imu{ordinal * Eigen::Vector3d::UnitX(), Eigen::Vector3d::Zero()}); Data::Imu{Eigen::Vector3d::Zero(), Eigen::Vector3d::Zero()});
} }
std::vector<Data> values_; std::vector<Data> values_;
@ -56,30 +53,30 @@ class OrderedMultiQueueTest : public ::testing::Test {
}; };
TEST_F(OrderedMultiQueueTest, Ordering) { TEST_F(OrderedMultiQueueTest, Ordering) {
queue_.Add(kFirst, common::FromUniversal(4), MakeImu(4)); queue_.Add(kFirst, MakeImu(4));
queue_.Add(kFirst, common::FromUniversal(5), MakeImu(5)); queue_.Add(kFirst, MakeImu(5));
queue_.Add(kFirst, common::FromUniversal(6), MakeImu(6)); queue_.Add(kFirst, MakeImu(6));
EXPECT_TRUE(values_.empty()); EXPECT_TRUE(values_.empty());
queue_.Add(kSecond, common::FromUniversal(1), MakeImu(1)); queue_.Add(kSecond, MakeImu(1));
EXPECT_TRUE(values_.empty()); EXPECT_TRUE(values_.empty());
queue_.Add(kThird, common::FromUniversal(2), MakeImu(2)); queue_.Add(kThird, MakeImu(2));
EXPECT_EQ(values_.size(), 1); EXPECT_EQ(values_.size(), 1);
queue_.Add(kSecond, common::FromUniversal(3), MakeImu(3)); queue_.Add(kSecond, MakeImu(3));
EXPECT_EQ(values_.size(), 2); EXPECT_EQ(values_.size(), 2);
queue_.Add(kSecond, common::FromUniversal(7), MakeImu(7)); queue_.Add(kSecond, MakeImu(7));
queue_.Add(kThird, common::FromUniversal(8), MakeImu(8)); queue_.Add(kThird, MakeImu(8));
queue_.Flush(); queue_.Flush();
EXPECT_EQ(8, values_.size()); EXPECT_EQ(8, values_.size());
for (size_t i = 0; i < values_.size(); ++i) { for (size_t i = 0; i < values_.size(); ++i) {
EXPECT_EQ(i + 1, values_[i].imu.linear_acceleration.x()); EXPECT_EQ(i + 1, common::ToUniversal(values_[i].time));
} }
} }
TEST_F(OrderedMultiQueueTest, MarkQueueAsFinished) { TEST_F(OrderedMultiQueueTest, MarkQueueAsFinished) {
queue_.Add(kFirst, common::FromUniversal(1), MakeImu(1)); queue_.Add(kFirst, MakeImu(1));
queue_.Add(kFirst, common::FromUniversal(2), MakeImu(2)); queue_.Add(kFirst, MakeImu(2));
queue_.Add(kFirst, common::FromUniversal(3), MakeImu(3)); queue_.Add(kFirst, MakeImu(3));
EXPECT_TRUE(values_.empty()); EXPECT_TRUE(values_.empty());
queue_.MarkQueueAsFinished(kFirst); queue_.MarkQueueAsFinished(kFirst);
EXPECT_TRUE(values_.empty()); EXPECT_TRUE(values_.empty());
@ -89,7 +86,7 @@ TEST_F(OrderedMultiQueueTest, MarkQueueAsFinished) {
EXPECT_EQ(3, values_.size()); EXPECT_EQ(3, values_.size());
for (size_t i = 0; i < values_.size(); ++i) { for (size_t i = 0; i < values_.size(); ++i) {
EXPECT_EQ(i + 1, values_[i].imu.linear_acceleration.x()); EXPECT_EQ(i + 1, common::ToUniversal(values_[i].time));
} }
} }