Change dropping data before start to be idempotent. (#133)
We drop as much data from each queue as possible without changing the start time that would be determined if we only saw the dispatched data, i.e. we retain one piece of data not beyond the common start time. This makes the process idempotent.master
parent
e703b1cea3
commit
7dc3ab1e9e
|
@ -175,6 +175,7 @@ google_test(common_configuration_files_test
|
||||||
SRCS
|
SRCS
|
||||||
configuration_files_test.cc
|
configuration_files_test.cc
|
||||||
DEPENDS
|
DEPENDS
|
||||||
|
common_config
|
||||||
common_configuration_file_resolver
|
common_configuration_file_resolver
|
||||||
common_lua_parameter_dictionary
|
common_lua_parameter_dictionary
|
||||||
mapping_map_builder
|
mapping_map_builder
|
||||||
|
|
|
@ -33,8 +33,9 @@ TEST(ConfigurationFilesTest, ValidateMapBuilderOptions) {
|
||||||
return MAP_BUILDER)text";
|
return MAP_BUILDER)text";
|
||||||
EXPECT_NO_FATAL_FAILURE({
|
EXPECT_NO_FATAL_FAILURE({
|
||||||
auto file_resolver = ::cartographer::common::make_unique<
|
auto file_resolver = ::cartographer::common::make_unique<
|
||||||
::cartographer::common::ConfigurationFileResolver>(std::vector<string>{
|
::cartographer::common::ConfigurationFileResolver>(
|
||||||
string(::cartographer::common::kSourceDirectory) + "/configuration_files"});
|
std::vector<string>{string(::cartographer::common::kSourceDirectory) +
|
||||||
|
"/configuration_files"});
|
||||||
::cartographer::common::LuaParameterDictionary lua_parameter_dictionary(
|
::cartographer::common::LuaParameterDictionary lua_parameter_dictionary(
|
||||||
kCode, std::move(file_resolver));
|
kCode, std::move(file_resolver));
|
||||||
::cartographer::mapping::CreateMapBuilderOptions(&lua_parameter_dictionary);
|
::cartographer::mapping::CreateMapBuilderOptions(&lua_parameter_dictionary);
|
||||||
|
|
|
@ -168,13 +168,27 @@ class OrderedMultiQueue {
|
||||||
<< common_start_time_ << "'.";
|
<< common_start_time_ << "'.";
|
||||||
}
|
}
|
||||||
|
|
||||||
if (next_data->time < common_start_time_) {
|
if (next_data->time >= common_start_time_) {
|
||||||
next_queue->queue.Pop();
|
// Happy case, we are beyond the 'common_start_time_' already.
|
||||||
continue;
|
last_dispatched_time_ = next_data->time;
|
||||||
|
next_queue->callback(next_queue->queue.Pop());
|
||||||
|
} else if (next_queue->queue.Size() < 2) {
|
||||||
|
if (!next_queue->finished) {
|
||||||
|
// We cannot decide whether to drop or dispatch this yet.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
last_dispatched_time_ = next_data->time;
|
||||||
|
next_queue->callback(next_queue->queue.Pop());
|
||||||
|
} else {
|
||||||
|
// We take a peek at the time after next data. If it also is not beyond
|
||||||
|
// 'common_start_time_' we drop 'next_data', otherwise we just found the
|
||||||
|
// first packet to dispatch from this queue.
|
||||||
|
std::unique_ptr<Data> next_data_owner = next_queue->queue.Pop();
|
||||||
|
if (next_queue->queue.Peek<Data>()->time > common_start_time_) {
|
||||||
|
last_dispatched_time_ = next_data->time;
|
||||||
|
next_queue->callback(std::move(next_data_owner));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
last_dispatched_time_ = next_data->time;
|
|
||||||
next_queue->callback(next_queue->queue.Pop());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue