feat: Publish progress of processing the bagfile (#940)

master
Alireza 2018-09-27 12:44:29 +02:00 committed by Alexander Belyaev
parent 00813a4d6b
commit dd4d2af58b
4 changed files with 69 additions and 6 deletions

View File

@ -55,16 +55,28 @@ std::tuple<ros::Time, ros::Time> PlayableBag::GetBeginEndTime() const {
return std::make_tuple(view_->getBeginTime(), view_->getEndTime());
}
rosbag::MessageInstance PlayableBag::GetNextMessage() {
rosbag::MessageInstance PlayableBag::GetNextMessage(
cartographer_ros_msgs::BagfileProgress* progress) {
CHECK(IsMessageAvailable());
const rosbag::MessageInstance msg = buffered_messages_.front();
buffered_messages_.pop_front();
AdvanceUntilMessageAvailable();
double processed_seconds = (msg.getTime() - view_->getBeginTime()).toSec();
if ((log_counter_++ % 10000) == 0) {
LOG(INFO) << "Processed " << (msg.getTime() - view_->getBeginTime()).toSec()
<< " of " << duration_in_seconds_ << " seconds of bag "
<< bag_filename_;
LOG(INFO) << "Processed " << processed_seconds << " of "
<< duration_in_seconds_ << " seconds of bag " << bag_filename_;
}
if (progress) {
progress->current_bagfile_name = bag_filename_;
progress->current_bagfile_id = bag_id_;
progress->total_messages = view_->size();
progress->processed_messages =
std::distance(view_->begin(), view_iterator_);
progress->total_seconds = duration_in_seconds_;
progress->processed_seconds = processed_seconds;
}
return msg;
}
@ -99,6 +111,12 @@ void PlayableBag::AdvanceUntilMessageAvailable() {
} while (!finished_ && !IsMessageAvailable());
}
PlayableBagMultiplexer::PlayableBagMultiplexer() : pnh_("~") {
bag_progress_pub_ = pnh_.advertise<cartographer_ros_msgs::BagfileProgress>(
"bagfile_progress", 10);
progress_pub_interval_ = pnh_.param("bagfile_progress_pub_interval", 10.0);
}
void PlayableBagMultiplexer::AddPlayableBag(PlayableBag playable_bag) {
for (const auto& topic : playable_bag.topics()) {
topics_.insert(topic);
@ -108,6 +126,7 @@ void PlayableBagMultiplexer::AddPlayableBag(PlayableBag playable_bag) {
next_message_queue_.emplace(
BagMessageItem{playable_bags_.back().PeekMessageTime(),
static_cast<int>(playable_bags_.size() - 1)});
bag_progress_time_map_[playable_bag.bag_id()] = ros::Time::now();
}
bool PlayableBagMultiplexer::IsMessageAvailable() const {
@ -119,7 +138,15 @@ PlayableBagMultiplexer::GetNextMessage() {
CHECK(IsMessageAvailable());
const int current_bag_index = next_message_queue_.top().bag_index;
PlayableBag& current_bag = playable_bags_.at(current_bag_index);
rosbag::MessageInstance msg = current_bag.GetNextMessage();
cartographer_ros_msgs::BagfileProgress progress;
rosbag::MessageInstance msg = current_bag.GetNextMessage(&progress);
if (ros::Time::now() - bag_progress_time_map_[current_bag.bag_id()] >=
ros::Duration(progress_pub_interval_) &&
bag_progress_pub_.getNumSubscribers() > 0) {
progress.total_bagfiles = playable_bags_.size();
bag_progress_pub_.publish(progress);
bag_progress_time_map_[current_bag.bag_id()] = ros::Time::now();
}
CHECK_EQ(msg.getTime(), next_message_queue_.top().message_timestamp);
next_message_queue_.pop();
if (current_bag.IsMessageAvailable()) {

View File

@ -19,6 +19,8 @@
#include <functional>
#include <queue>
#include <cartographer_ros_msgs/BagfileProgress.h>
#include <ros/node_handle.h>
#include "rosbag/bag.h"
#include "rosbag/view.h"
#include "tf2_ros/buffer.h"
@ -38,7 +40,8 @@ class PlayableBag {
FilteringEarlyMessageHandler filtering_early_message_handler);
ros::Time PeekMessageTime() const;
rosbag::MessageInstance GetNextMessage();
rosbag::MessageInstance GetNextMessage(
cartographer_ros_msgs::BagfileProgress* progress);
bool IsMessageAvailable() const;
std::tuple<ros::Time, ros::Time> GetBeginEndTime() const;
@ -65,6 +68,7 @@ class PlayableBag {
class PlayableBagMultiplexer {
public:
PlayableBagMultiplexer();
void AddPlayableBag(PlayableBag playable_bag);
// Returns the next message from the multiplexed (merge-sorted) message
@ -90,6 +94,14 @@ class PlayableBagMultiplexer {
};
};
ros::NodeHandle pnh_;
// Publishes information about the bag-file(s) processing and its progress
ros::Publisher bag_progress_pub_;
// Map between bagfile id and the last time when its progress was published
std::map<int, ros::Time> bag_progress_time_map_;
// The time interval of publishing bag-file(s) processing in seconds
double progress_pub_interval_;
std::vector<PlayableBag> playable_bags_;
std::priority_queue<BagMessageItem, std::vector<BagMessageItem>,
BagMessageItem::TimestampIsGreater>

View File

@ -26,6 +26,7 @@ find_package(catkin REQUIRED COMPONENTS message_generation ${PACKAGE_DEPENDENCIE
add_message_files(
DIRECTORY msg
FILES
BagfileProgress.msg
LandmarkEntry.msg
LandmarkList.msg
SensorTopics.msg

View File

@ -0,0 +1,23 @@
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Contains general information about the bagfiles processing progress
string current_bagfile_name
uint32 current_bagfile_id
uint32 total_bagfiles
uint32 total_messages
uint32 processed_messages
float32 total_seconds
float32 processed_seconds