From dd4d2af58b3a7f34b036abe5bf16b7d545b6e7d5 Mon Sep 17 00:00:00 2001 From: Alireza Date: Thu, 27 Sep 2018 12:44:29 +0200 Subject: [PATCH] feat: Publish progress of processing the bagfile (#940) --- .../cartographer_ros/playable_bag.cc | 37 ++++++++++++++++--- .../cartographer_ros/playable_bag.h | 14 ++++++- cartographer_ros_msgs/CMakeLists.txt | 1 + cartographer_ros_msgs/msg/BagfileProgress.msg | 23 ++++++++++++ 4 files changed, 69 insertions(+), 6 deletions(-) create mode 100644 cartographer_ros_msgs/msg/BagfileProgress.msg diff --git a/cartographer_ros/cartographer_ros/playable_bag.cc b/cartographer_ros/cartographer_ros/playable_bag.cc index 9fc9712..813e3ad 100644 --- a/cartographer_ros/cartographer_ros/playable_bag.cc +++ b/cartographer_ros/cartographer_ros/playable_bag.cc @@ -55,16 +55,28 @@ std::tuple PlayableBag::GetBeginEndTime() const { return std::make_tuple(view_->getBeginTime(), view_->getEndTime()); } -rosbag::MessageInstance PlayableBag::GetNextMessage() { +rosbag::MessageInstance PlayableBag::GetNextMessage( + cartographer_ros_msgs::BagfileProgress* progress) { CHECK(IsMessageAvailable()); const rosbag::MessageInstance msg = buffered_messages_.front(); buffered_messages_.pop_front(); AdvanceUntilMessageAvailable(); + double processed_seconds = (msg.getTime() - view_->getBeginTime()).toSec(); if ((log_counter_++ % 10000) == 0) { - LOG(INFO) << "Processed " << (msg.getTime() - view_->getBeginTime()).toSec() - << " of " << duration_in_seconds_ << " seconds of bag " - << bag_filename_; + LOG(INFO) << "Processed " << processed_seconds << " of " + << duration_in_seconds_ << " seconds of bag " << bag_filename_; } + + if (progress) { + progress->current_bagfile_name = bag_filename_; + progress->current_bagfile_id = bag_id_; + progress->total_messages = view_->size(); + progress->processed_messages = + std::distance(view_->begin(), view_iterator_); + progress->total_seconds = duration_in_seconds_; + progress->processed_seconds = processed_seconds; + } + return msg; } @@ -99,6 +111,12 @@ void PlayableBag::AdvanceUntilMessageAvailable() { } while (!finished_ && !IsMessageAvailable()); } +PlayableBagMultiplexer::PlayableBagMultiplexer() : pnh_("~") { + bag_progress_pub_ = pnh_.advertise( + "bagfile_progress", 10); + progress_pub_interval_ = pnh_.param("bagfile_progress_pub_interval", 10.0); +} + void PlayableBagMultiplexer::AddPlayableBag(PlayableBag playable_bag) { for (const auto& topic : playable_bag.topics()) { topics_.insert(topic); @@ -108,6 +126,7 @@ void PlayableBagMultiplexer::AddPlayableBag(PlayableBag playable_bag) { next_message_queue_.emplace( BagMessageItem{playable_bags_.back().PeekMessageTime(), static_cast(playable_bags_.size() - 1)}); + bag_progress_time_map_[playable_bag.bag_id()] = ros::Time::now(); } bool PlayableBagMultiplexer::IsMessageAvailable() const { @@ -119,7 +138,15 @@ PlayableBagMultiplexer::GetNextMessage() { CHECK(IsMessageAvailable()); const int current_bag_index = next_message_queue_.top().bag_index; PlayableBag& current_bag = playable_bags_.at(current_bag_index); - rosbag::MessageInstance msg = current_bag.GetNextMessage(); + cartographer_ros_msgs::BagfileProgress progress; + rosbag::MessageInstance msg = current_bag.GetNextMessage(&progress); + if (ros::Time::now() - bag_progress_time_map_[current_bag.bag_id()] >= + ros::Duration(progress_pub_interval_) && + bag_progress_pub_.getNumSubscribers() > 0) { + progress.total_bagfiles = playable_bags_.size(); + bag_progress_pub_.publish(progress); + bag_progress_time_map_[current_bag.bag_id()] = ros::Time::now(); + } CHECK_EQ(msg.getTime(), next_message_queue_.top().message_timestamp); next_message_queue_.pop(); if (current_bag.IsMessageAvailable()) { diff --git a/cartographer_ros/cartographer_ros/playable_bag.h b/cartographer_ros/cartographer_ros/playable_bag.h index 8526a86..afb5fdc 100644 --- a/cartographer_ros/cartographer_ros/playable_bag.h +++ b/cartographer_ros/cartographer_ros/playable_bag.h @@ -19,6 +19,8 @@ #include #include +#include +#include #include "rosbag/bag.h" #include "rosbag/view.h" #include "tf2_ros/buffer.h" @@ -38,7 +40,8 @@ class PlayableBag { FilteringEarlyMessageHandler filtering_early_message_handler); ros::Time PeekMessageTime() const; - rosbag::MessageInstance GetNextMessage(); + rosbag::MessageInstance GetNextMessage( + cartographer_ros_msgs::BagfileProgress* progress); bool IsMessageAvailable() const; std::tuple GetBeginEndTime() const; @@ -65,6 +68,7 @@ class PlayableBag { class PlayableBagMultiplexer { public: + PlayableBagMultiplexer(); void AddPlayableBag(PlayableBag playable_bag); // Returns the next message from the multiplexed (merge-sorted) message @@ -90,6 +94,14 @@ class PlayableBagMultiplexer { }; }; + ros::NodeHandle pnh_; + // Publishes information about the bag-file(s) processing and its progress + ros::Publisher bag_progress_pub_; + // Map between bagfile id and the last time when its progress was published + std::map bag_progress_time_map_; + // The time interval of publishing bag-file(s) processing in seconds + double progress_pub_interval_; + std::vector playable_bags_; std::priority_queue, BagMessageItem::TimestampIsGreater> diff --git a/cartographer_ros_msgs/CMakeLists.txt b/cartographer_ros_msgs/CMakeLists.txt index dcdbc51..0bc9bf2 100644 --- a/cartographer_ros_msgs/CMakeLists.txt +++ b/cartographer_ros_msgs/CMakeLists.txt @@ -26,6 +26,7 @@ find_package(catkin REQUIRED COMPONENTS message_generation ${PACKAGE_DEPENDENCIE add_message_files( DIRECTORY msg FILES + BagfileProgress.msg LandmarkEntry.msg LandmarkList.msg SensorTopics.msg diff --git a/cartographer_ros_msgs/msg/BagfileProgress.msg b/cartographer_ros_msgs/msg/BagfileProgress.msg new file mode 100644 index 0000000..88a810e --- /dev/null +++ b/cartographer_ros_msgs/msg/BagfileProgress.msg @@ -0,0 +1,23 @@ +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Contains general information about the bagfiles processing progress + +string current_bagfile_name +uint32 current_bagfile_id +uint32 total_bagfiles +uint32 total_messages +uint32 processed_messages +float32 total_seconds +float32 processed_seconds