Rosbags
Rosbags is a pure Python library designed to read, modify, convert, and write rosbag files for both ROS 1 and ROS 2. It offers high-level interfaces, support for various rosbag formats (rosbag1 and rosbag2), an extensible type system with serializers and deserializers, and efficient conversion capabilities. The library is actively maintained with regular releases. The current version is 0.11.0.
Warnings
- breaking The import paths for serialization/deserialization modules were refactored, specifically affecting imports from `rosbags.serde.serdes`. Code relying on these old paths will break.
- gotcha When reading rosbag files, especially those without embedded message definitions or containing custom message types, `rosbags` requires an explicit `Typestore` to correctly deserialize messages. Failing to provide an appropriate `default_typestore` can lead to deserialization errors or incorrect message interpretation.
- gotcha While `rosbags` itself is designed for efficient handling of bag files, processing extremely large rosbag files (e.g., hundreds of GBs or millions of messages) without care can still lead to high memory consumption, especially if users attempt to load all messages or connections into memory simultaneously. Iterators are provided for a reason.
Install
-
pip install rosbags
Imports
- AnyReader
from rosbags.highlevel import AnyReader
- Stores
from rosbags.typesys import Stores
- get_typestore
from rosbags.typesys import get_typestore
- Reader
from rosbags.rosbag2 import Reader
- Serde
from rosbags.serde import Serde
Quickstart
from pathlib import Path
from rosbags.highlevel import AnyReader
from rosbags.typesys import Stores, get_typestore
# Replace with the actual path to your rosbag file/directory
# For testing, you can create a dummy bag file or use an existing one.
# Example: bagpath = Path('./path/to/my_rosbag')
# Ensure the path points to the directory containing a ROS2 bag or a ROS1 .bag file.
bagpath = Path('test_rosbag_dir') # Placeholder, modify as needed
# Create a type store to use if the bag has no message definitions or for custom types.
# ROS2_FOXY is an example; choose the appropriate ROS distribution if needed.
typestore = get_typestore(Stores.ROS2_FOXY)
try:
# Create reader instance and open for reading.
with AnyReader([bagpath], default_typestore=typestore) as reader:
# Filter connections (topics) if desired. Here, we read all messages.
# connections = [x for x in reader.connections if x.topic == '/imu_raw/Imu']
print(f"Found {len(reader.connections)} connections (topics) in the bag.")
for connection in reader.connections:
print(f" Topic: {connection.topic}, Type: {connection.msgtype}")
message_count = 0
for connection, timestamp, rawdata in reader.messages():
# Deserialize the message data
msg = reader.deserialize(rawdata, connection.msgtype)
# Access message fields (example for a common message type)
# This assumes your message type has a 'header' and 'frame_id'
# Adapt based on the actual message types in your bag.
try:
if hasattr(msg, 'header') and hasattr(msg.header, 'frame_id'):
print(f"[{timestamp}] Topic: {connection.topic}, Frame ID: {msg.header.frame_id}")
else:
print(f"[{timestamp}] Topic: {connection.topic}, Message: {msg}")
except AttributeError:
print(f"[{timestamp}] Topic: {connection.topic}, Message: {msg}") # Fallback for messages without header
message_count += 1
if message_count >= 10: # Limit output for brevity
print("... (truncated after 10 messages)")
break
except FileNotFoundError:
print(f"Error: Bag file or directory not found at {bagpath}. Please create or specify a valid path.")
except Exception as e:
print(f"An error occurred: {e}")