Move the OrderedMultiQueue implementation out of the header. (#152)
parent
0e826377c4
commit
f3526bd252
|
@ -110,10 +110,12 @@ TEST_F(TrajectoryConnectivityTest, ConnectedComponents) {
|
||||||
five_cluster = &connections[0];
|
five_cluster = &connections[0];
|
||||||
}
|
}
|
||||||
for (int i = 0; i <= 9; ++i) {
|
for (int i = 0; i <= 9; ++i) {
|
||||||
EXPECT_EQ(i <= 4, std::find(zero_cluster->begin(), zero_cluster->end(),
|
EXPECT_EQ(i <= 4,
|
||||||
trajectory(i)) != zero_cluster->end());
|
std::find(zero_cluster->begin(), zero_cluster->end(),
|
||||||
EXPECT_EQ(i > 4, std::find(five_cluster->begin(), five_cluster->end(),
|
trajectory(i)) != zero_cluster->end());
|
||||||
trajectory(i)) != five_cluster->end());
|
EXPECT_EQ(i > 4,
|
||||||
|
std::find(five_cluster->begin(), five_cluster->end(),
|
||||||
|
trajectory(i)) != five_cluster->end());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,12 +123,14 @@ TEST_F(TrajectoryConnectivityTest, ConnectionCount) {
|
||||||
for (int i = 0; i < 10; ++i) {
|
for (int i = 0; i < 10; ++i) {
|
||||||
trajectory_connectivity_.Connect(trajectory(0), trajectory(1));
|
trajectory_connectivity_.Connect(trajectory(0), trajectory(1));
|
||||||
// Permute the arguments to check invariance.
|
// Permute the arguments to check invariance.
|
||||||
EXPECT_EQ(i + 1, trajectory_connectivity_.ConnectionCount(trajectory(1),
|
EXPECT_EQ(
|
||||||
trajectory(0)));
|
i + 1,
|
||||||
|
trajectory_connectivity_.ConnectionCount(trajectory(1), trajectory(0)));
|
||||||
}
|
}
|
||||||
for (int i = 1; i < 9; ++i) {
|
for (int i = 1; i < 9; ++i) {
|
||||||
EXPECT_EQ(0, trajectory_connectivity_.ConnectionCount(trajectory(i),
|
EXPECT_EQ(0,
|
||||||
trajectory(i + 1)));
|
trajectory_connectivity_.ConnectionCount(trajectory(i),
|
||||||
|
trajectory(i + 1)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -175,8 +175,9 @@ void CastRays(const sensor::LaserFan& laser_fan, const MapLimits& limits,
|
||||||
|
|
||||||
// Finally, compute and add empty rays based on missing echos in the scan.
|
// Finally, compute and add empty rays based on missing echos in the scan.
|
||||||
for (const Eigen::Vector3f& missing_echo : laser_fan.misses) {
|
for (const Eigen::Vector3f& missing_echo : laser_fan.misses) {
|
||||||
CastRay(begin, superscaled_limits.GetXYIndexOfCellContainingPoint(
|
CastRay(begin,
|
||||||
missing_echo.x(), missing_echo.y()),
|
superscaled_limits.GetXYIndexOfCellContainingPoint(
|
||||||
|
missing_echo.x(), missing_echo.y()),
|
||||||
miss_visitor);
|
miss_visitor);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,10 +75,11 @@ TEST_F(InterpolatedGridTest, MonotonicBehaviorBetweenGridPointsInX) {
|
||||||
for (double sample = kSampleStep;
|
for (double sample = kSampleStep;
|
||||||
sample < hybrid_grid_.resolution() - 2 * kSampleStep;
|
sample < hybrid_grid_.resolution() - 2 * kSampleStep;
|
||||||
sample += kSampleStep) {
|
sample += kSampleStep) {
|
||||||
EXPECT_LT(0., grid_difference * (interpolated_grid_.GetProbability(
|
EXPECT_LT(0.,
|
||||||
x + sample + kSampleStep, y, z) -
|
grid_difference *
|
||||||
interpolated_grid_.GetProbability(
|
(interpolated_grid_.GetProbability(
|
||||||
x + sample, y, z)));
|
x + sample + kSampleStep, y, z) -
|
||||||
|
interpolated_grid_.GetProbability(x + sample, y, z)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,9 +61,10 @@ TEST(PrecomputedGridGeneratorTest, TestAgainstNaiveAlgorithm) {
|
||||||
for (int dx = 0; dx < width; ++dx) {
|
for (int dx = 0; dx < width; ++dx) {
|
||||||
for (int dy = 0; dy < width; ++dy) {
|
for (int dy = 0; dy < width; ++dy) {
|
||||||
for (int dz = 0; dz < width; ++dz) {
|
for (int dz = 0; dz < width; ++dz) {
|
||||||
max_probability = std::max(
|
max_probability =
|
||||||
max_probability, hybrid_grid.GetProbability(
|
std::max(max_probability,
|
||||||
Eigen::Array3i(x + dx, y + dy, z + dz)));
|
hybrid_grid.GetProbability(
|
||||||
|
Eigen::Array3i(x + dx, y + dy, z + dz)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -214,14 +214,14 @@ void SparsePoseGraph::ComputeConstraintsForScan(
|
||||||
// Unchanged covariance as (submap <- map) is a translation.
|
// Unchanged covariance as (submap <- map) is a translation.
|
||||||
const transform::Rigid3d constraint_transform =
|
const transform::Rigid3d constraint_transform =
|
||||||
submap->local_pose().inverse() * pose;
|
submap->local_pose().inverse() * pose;
|
||||||
constraints_.push_back(
|
constraints_.push_back(Constraint{
|
||||||
Constraint{submap_index,
|
submap_index,
|
||||||
scan_index,
|
scan_index,
|
||||||
{constraint_transform,
|
{constraint_transform, common::ComputeSpdMatrixSqrtInverse(
|
||||||
common::ComputeSpdMatrixSqrtInverse(
|
covariance,
|
||||||
covariance, options_.constraint_builder_options()
|
options_.constraint_builder_options()
|
||||||
.lower_covariance_eigenvalue_bound())},
|
.lower_covariance_eigenvalue_bound())},
|
||||||
Constraint::INTRA_SUBMAP});
|
Constraint::INTRA_SUBMAP});
|
||||||
}
|
}
|
||||||
|
|
||||||
CHECK_LT(submap_states_.size(), std::numeric_limits<int>::max());
|
CHECK_LT(submap_states_.size(), std::numeric_limits<int>::max());
|
||||||
|
|
|
@ -78,6 +78,9 @@ google_library(sensor_laser
|
||||||
)
|
)
|
||||||
|
|
||||||
google_library(sensor_ordered_multi_queue
|
google_library(sensor_ordered_multi_queue
|
||||||
|
USES_GLOG
|
||||||
|
SRCS
|
||||||
|
ordered_multi_queue.cc
|
||||||
HDRS
|
HDRS
|
||||||
ordered_multi_queue.h
|
ordered_multi_queue.h
|
||||||
DEPENDS
|
DEPENDS
|
||||||
|
|
|
@ -0,0 +1,173 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2016 The Cartographer Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "cartographer/sensor/ordered_multi_queue.h"
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
|
#include <sstream>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include "cartographer/common/make_unique.h"
|
||||||
|
#include "glog/logging.h"
|
||||||
|
|
||||||
|
namespace cartographer {
|
||||||
|
namespace sensor {
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
// Number of items that can be queued up before we log which queues are waiting
|
||||||
|
// for data.
|
||||||
|
const int kMaxQueueSize = 500;
|
||||||
|
|
||||||
|
inline std::ostream& operator<<(std::ostream& out, const QueueKey& key) {
|
||||||
|
return out << '(' << key.trajectory_id << ", " << key.sensor_id << ')';
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
OrderedMultiQueue::OrderedMultiQueue() {}
|
||||||
|
|
||||||
|
OrderedMultiQueue::~OrderedMultiQueue() {
|
||||||
|
for (auto& entry : queues_) {
|
||||||
|
CHECK(entry.second.finished);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void OrderedMultiQueue::AddQueue(const QueueKey& queue_key, Callback callback) {
|
||||||
|
CHECK_EQ(queues_.count(queue_key), 0);
|
||||||
|
queues_[queue_key].callback = callback;
|
||||||
|
}
|
||||||
|
|
||||||
|
void OrderedMultiQueue::MarkQueueAsFinished(const QueueKey& queue_key) {
|
||||||
|
auto it = queues_.find(queue_key);
|
||||||
|
CHECK(it != queues_.end()) << "Did not find '" << queue_key << "'.";
|
||||||
|
auto& queue = it->second;
|
||||||
|
CHECK(!queue.finished);
|
||||||
|
queue.finished = true;
|
||||||
|
Dispatch();
|
||||||
|
}
|
||||||
|
|
||||||
|
void OrderedMultiQueue::Add(const QueueKey& queue_key,
|
||||||
|
std::unique_ptr<Data> data) {
|
||||||
|
auto it = queues_.find(queue_key);
|
||||||
|
if (it == queues_.end()) {
|
||||||
|
LOG_EVERY_N(WARNING, 1000) << "Ignored data for queue: '" << queue_key
|
||||||
|
<< "'";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
it->second.queue.Push(std::move(data));
|
||||||
|
Dispatch();
|
||||||
|
}
|
||||||
|
|
||||||
|
void OrderedMultiQueue::Flush() {
|
||||||
|
std::vector<QueueKey> unfinished_queues;
|
||||||
|
for (auto& entry : queues_) {
|
||||||
|
if (!entry.second.finished) {
|
||||||
|
unfinished_queues.push_back(entry.first);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (auto& unfinished_queue : unfinished_queues) {
|
||||||
|
MarkQueueAsFinished(unfinished_queue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void OrderedMultiQueue::Dispatch() {
|
||||||
|
while (true) {
|
||||||
|
Queue* next_queue = nullptr;
|
||||||
|
const Data* next_data = nullptr;
|
||||||
|
for (auto it = queues_.begin(); it != queues_.end();) {
|
||||||
|
auto& queue = it->second.queue;
|
||||||
|
const auto* data = queue.Peek<Data>();
|
||||||
|
if (data == nullptr) {
|
||||||
|
if (it->second.finished) {
|
||||||
|
queues_.erase(it++);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
CannotMakeProgress();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (next_data == nullptr ||
|
||||||
|
std::forward_as_tuple(data->time, it->first) <
|
||||||
|
std::forward_as_tuple(next_data->time, it->first)) {
|
||||||
|
next_data = data;
|
||||||
|
next_queue = &it->second;
|
||||||
|
}
|
||||||
|
CHECK_LE(last_dispatched_time_, next_data->time)
|
||||||
|
<< "Non-sorted data added to queue: '" << it->first << "'";
|
||||||
|
++it;
|
||||||
|
}
|
||||||
|
if (next_data == nullptr) {
|
||||||
|
CHECK(queues_.empty());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we haven't dispatched any data yet, fast forward all queues until a
|
||||||
|
// common start time has been reached.
|
||||||
|
if (common_start_time_ == common::Time::min()) {
|
||||||
|
for (auto& entry : queues_) {
|
||||||
|
common_start_time_ =
|
||||||
|
std::max(common_start_time_, entry.second.queue.Peek<Data>()->time);
|
||||||
|
}
|
||||||
|
LOG(INFO) << "All sensor data is available starting at '"
|
||||||
|
<< common_start_time_ << "'.";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (next_data->time >= common_start_time_) {
|
||||||
|
// Happy case, we are beyond the 'common_start_time_' already.
|
||||||
|
last_dispatched_time_ = next_data->time;
|
||||||
|
next_queue->callback(next_queue->queue.Pop());
|
||||||
|
} else if (next_queue->queue.Size() < 2) {
|
||||||
|
if (!next_queue->finished) {
|
||||||
|
// We cannot decide whether to drop or dispatch this yet.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
last_dispatched_time_ = next_data->time;
|
||||||
|
next_queue->callback(next_queue->queue.Pop());
|
||||||
|
} else {
|
||||||
|
// We take a peek at the time after next data. If it also is not beyond
|
||||||
|
// 'common_start_time_' we drop 'next_data', otherwise we just found the
|
||||||
|
// first packet to dispatch from this queue.
|
||||||
|
std::unique_ptr<Data> next_data_owner = next_queue->queue.Pop();
|
||||||
|
if (next_queue->queue.Peek<Data>()->time > common_start_time_) {
|
||||||
|
last_dispatched_time_ = next_data->time;
|
||||||
|
next_queue->callback(std::move(next_data_owner));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void OrderedMultiQueue::CannotMakeProgress() {
|
||||||
|
for (auto& entry : queues_) {
|
||||||
|
if (entry.second.queue.Size() > kMaxQueueSize) {
|
||||||
|
LOG_EVERY_N(WARNING, 60) << "Queues waiting for data: "
|
||||||
|
<< EmptyQueuesDebugString();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
string OrderedMultiQueue::EmptyQueuesDebugString() {
|
||||||
|
std::ostringstream empty_queues;
|
||||||
|
for (auto& entry : queues_) {
|
||||||
|
if (entry.second.queue.Size() == 0) {
|
||||||
|
empty_queues << (empty_queues.tellp() > 0 ? ", " : "") << entry.first;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return empty_queues.str();
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace sensor
|
||||||
|
} // namespace cartographer
|
|
@ -17,14 +17,13 @@
|
||||||
#ifndef CARTOGRAPHER_SENSOR_ORDERED_MULTI_QUEUE_H_
|
#ifndef CARTOGRAPHER_SENSOR_ORDERED_MULTI_QUEUE_H_
|
||||||
#define CARTOGRAPHER_SENSOR_ORDERED_MULTI_QUEUE_H_
|
#define CARTOGRAPHER_SENSOR_ORDERED_MULTI_QUEUE_H_
|
||||||
|
|
||||||
#include <algorithm>
|
#include <functional>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <set>
|
#include <string>
|
||||||
#include <sstream>
|
#include <tuple>
|
||||||
|
|
||||||
#include "cartographer/common/blocking_queue.h"
|
#include "cartographer/common/blocking_queue.h"
|
||||||
#include "cartographer/common/make_unique.h"
|
|
||||||
#include "cartographer/common/port.h"
|
#include "cartographer/common/port.h"
|
||||||
#include "cartographer/common/time.h"
|
#include "cartographer/common/time.h"
|
||||||
#include "cartographer/sensor/data.h"
|
#include "cartographer/sensor/data.h"
|
||||||
|
@ -32,10 +31,6 @@
|
||||||
namespace cartographer {
|
namespace cartographer {
|
||||||
namespace sensor {
|
namespace sensor {
|
||||||
|
|
||||||
// Number of items that can be queued up before we log which queues are waiting
|
|
||||||
// for data.
|
|
||||||
const int kMaxQueueSize = 500;
|
|
||||||
|
|
||||||
struct QueueKey {
|
struct QueueKey {
|
||||||
int trajectory_id;
|
int trajectory_id;
|
||||||
string sensor_id;
|
string sensor_id;
|
||||||
|
@ -46,10 +41,6 @@ struct QueueKey {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
inline std::ostream& operator<<(std::ostream& out, const QueueKey& key) {
|
|
||||||
return out << '(' << key.trajectory_id << ", " << key.sensor_id << ')';
|
|
||||||
}
|
|
||||||
|
|
||||||
// Maintains multiple queues of sorted sensor data and dispatches it in merge
|
// Maintains multiple queues of sorted sensor data and dispatches it in merge
|
||||||
// sorted order. It will wait to see at least one value for each unfinished
|
// sorted order. It will wait to see at least one value for each unfinished
|
||||||
// queue before dispatching the next time ordered value across all queues.
|
// queue before dispatching the next time ordered value across all queues.
|
||||||
|
@ -59,50 +50,24 @@ class OrderedMultiQueue {
|
||||||
public:
|
public:
|
||||||
using Callback = std::function<void(std::unique_ptr<Data>)>;
|
using Callback = std::function<void(std::unique_ptr<Data>)>;
|
||||||
|
|
||||||
OrderedMultiQueue() {}
|
OrderedMultiQueue();
|
||||||
|
~OrderedMultiQueue();
|
||||||
|
|
||||||
~OrderedMultiQueue() {
|
// Adds a new queue with key 'queue_key' which must not already exist.
|
||||||
for (auto& entry : queues_) {
|
// 'callback' will be called whenever data from this queue can be dispatched.
|
||||||
CHECK(entry.second.finished);
|
void AddQueue(const QueueKey& queue_key, Callback callback);
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void AddQueue(const QueueKey& queue_key, Callback callback) {
|
// Marks a queue as finished, i.e. no further data can be added. The queue
|
||||||
CHECK(FindOrNull(queue_key) == nullptr);
|
// will be removed once the last piece of data from it has been dispatched.
|
||||||
queues_[queue_key].callback = callback;
|
void MarkQueueAsFinished(const QueueKey& queue_key);
|
||||||
}
|
|
||||||
|
|
||||||
void MarkQueueAsFinished(const QueueKey& queue_key) {
|
// Adds 'data' to a queue with the given 'queue_key'. Data must be added
|
||||||
auto& queue = FindOrDie(queue_key);
|
// sorted per queue.
|
||||||
CHECK(!queue.finished);
|
void Add(const QueueKey& queue_key, std::unique_ptr<Data> data);
|
||||||
queue.finished = true;
|
|
||||||
Dispatch();
|
|
||||||
}
|
|
||||||
|
|
||||||
void Add(const QueueKey& queue_key, std::unique_ptr<Data> data) {
|
|
||||||
auto* queue = FindOrNull(queue_key);
|
|
||||||
if (queue == nullptr) {
|
|
||||||
LOG_EVERY_N(WARNING, 1000) << "Ignored data for queue: '" << queue_key
|
|
||||||
<< "'";
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
queue->queue.Push(std::move(data));
|
|
||||||
Dispatch();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dispatches all remaining values in sorted order and removes the underlying
|
// Dispatches all remaining values in sorted order and removes the underlying
|
||||||
// queues.
|
// queues.
|
||||||
void Flush() {
|
void Flush();
|
||||||
std::vector<QueueKey> unfinished_queues;
|
|
||||||
for (auto& entry : queues_) {
|
|
||||||
if (!entry.second.finished) {
|
|
||||||
unfinished_queues.push_back(entry.first);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (auto& unfinished_queue : unfinished_queues) {
|
|
||||||
MarkQueueAsFinished(unfinished_queue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
struct Queue {
|
struct Queue {
|
||||||
|
@ -111,107 +76,9 @@ class OrderedMultiQueue {
|
||||||
bool finished = false;
|
bool finished = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Returns the queue with 'key' or LOG(FATAL).
|
void Dispatch();
|
||||||
Queue& FindOrDie(const QueueKey& key) {
|
void CannotMakeProgress();
|
||||||
auto it = queues_.find(key);
|
string EmptyQueuesDebugString();
|
||||||
CHECK(it != queues_.end()) << "Did not find '" << key << "'.";
|
|
||||||
return it->second;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns the queue with 'key' or nullptr.
|
|
||||||
Queue* FindOrNull(const QueueKey& key) {
|
|
||||||
auto it = queues_.find(key);
|
|
||||||
if (it == queues_.end()) {
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
return &it->second;
|
|
||||||
}
|
|
||||||
|
|
||||||
void Dispatch() {
|
|
||||||
while (true) {
|
|
||||||
Queue* next_queue = nullptr;
|
|
||||||
const Data* next_data = nullptr;
|
|
||||||
for (auto it = queues_.begin(); it != queues_.end();) {
|
|
||||||
auto& queue = it->second.queue;
|
|
||||||
const auto* data = queue.Peek<Data>();
|
|
||||||
if (data == nullptr) {
|
|
||||||
if (it->second.finished) {
|
|
||||||
queues_.erase(it++);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
CannotMakeProgress();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (next_data == nullptr ||
|
|
||||||
std::forward_as_tuple(data->time, it->first) <
|
|
||||||
std::forward_as_tuple(next_data->time, it->first)) {
|
|
||||||
next_data = data;
|
|
||||||
next_queue = &it->second;
|
|
||||||
}
|
|
||||||
CHECK_LE(last_dispatched_time_, next_data->time)
|
|
||||||
<< "Non-sorted data added to queue: '" << it->first << "'";
|
|
||||||
++it;
|
|
||||||
}
|
|
||||||
if (next_data == nullptr) {
|
|
||||||
CHECK(queues_.empty());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we haven't dispatched any data yet, fast forward all queues until a
|
|
||||||
// common start time has been reached.
|
|
||||||
if (common_start_time_ == common::Time::min()) {
|
|
||||||
for (auto& entry : queues_) {
|
|
||||||
common_start_time_ = std::max(common_start_time_,
|
|
||||||
entry.second.queue.Peek<Data>()->time);
|
|
||||||
}
|
|
||||||
LOG(INFO) << "All sensor data is available starting at '"
|
|
||||||
<< common_start_time_ << "'.";
|
|
||||||
}
|
|
||||||
|
|
||||||
if (next_data->time >= common_start_time_) {
|
|
||||||
// Happy case, we are beyond the 'common_start_time_' already.
|
|
||||||
last_dispatched_time_ = next_data->time;
|
|
||||||
next_queue->callback(next_queue->queue.Pop());
|
|
||||||
} else if (next_queue->queue.Size() < 2) {
|
|
||||||
if (!next_queue->finished) {
|
|
||||||
// We cannot decide whether to drop or dispatch this yet.
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
last_dispatched_time_ = next_data->time;
|
|
||||||
next_queue->callback(next_queue->queue.Pop());
|
|
||||||
} else {
|
|
||||||
// We take a peek at the time after next data. If it also is not beyond
|
|
||||||
// 'common_start_time_' we drop 'next_data', otherwise we just found the
|
|
||||||
// first packet to dispatch from this queue.
|
|
||||||
std::unique_ptr<Data> next_data_owner = next_queue->queue.Pop();
|
|
||||||
if (next_queue->queue.Peek<Data>()->time > common_start_time_) {
|
|
||||||
last_dispatched_time_ = next_data->time;
|
|
||||||
next_queue->callback(std::move(next_data_owner));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Called when not all necessary queues are filled to dispatch messages.
|
|
||||||
void CannotMakeProgress() {
|
|
||||||
for (auto& entry : queues_) {
|
|
||||||
if (entry.second.queue.Size() > kMaxQueueSize) {
|
|
||||||
LOG_EVERY_N(WARNING, 60) << "Queues waiting for data: "
|
|
||||||
<< EmptyQueuesDebugString();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
string EmptyQueuesDebugString() {
|
|
||||||
std::ostringstream empty_queues;
|
|
||||||
for (auto& entry : queues_) {
|
|
||||||
if (entry.second.queue.Size() == 0) {
|
|
||||||
empty_queues << (empty_queues.tellp() > 0 ? ", " : "") << entry.first;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return empty_queues.str();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Used to verify that values are dispatched in sorted order.
|
// Used to verify that values are dispatched in sorted order.
|
||||||
common::Time last_dispatched_time_ = common::Time::min();
|
common::Time last_dispatched_time_ = common::Time::min();
|
||||||
|
|
Loading…
Reference in New Issue