Rosbags

0.11.0 · active · verified Tue Apr 14

Rosbags is a pure Python library designed to read, modify, convert, and write rosbag files for both ROS 1 and ROS 2. It offers high-level interfaces, support for various rosbag formats (rosbag1 and rosbag2), an extensible type system with serializers and deserializers, and efficient conversion capabilities. The library is actively maintained with regular releases. The current version is 0.11.0.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to open a rosbag file (ROS1 or ROS2), iterate through its messages, and deserialize them using the high-level `AnyReader` interface. It includes explicit handling for type stores, which is crucial for correct message deserialization, especially if the bag does not contain complete message definitions or for custom message types. Remember to replace `test_rosbag_dir` with the actual path to your rosbag.

from pathlib import Path
from rosbags.highlevel import AnyReader
from rosbags.typesys import Stores, get_typestore

# Replace with the actual path to your rosbag file/directory
# For testing, you can create a dummy bag file or use an existing one.
# Example: bagpath = Path('./path/to/my_rosbag')
# Ensure the path points to the directory containing a ROS2 bag or a ROS1 .bag file.
bagpath = Path('test_rosbag_dir') # Placeholder, modify as needed

# Create a type store to use if the bag has no message definitions or for custom types.
# ROS2_FOXY is an example; choose the appropriate ROS distribution if needed.
typestore = get_typestore(Stores.ROS2_FOXY)

try:
    # Create reader instance and open for reading.
    with AnyReader([bagpath], default_typestore=typestore) as reader:
        # Filter connections (topics) if desired. Here, we read all messages.
        # connections = [x for x in reader.connections if x.topic == '/imu_raw/Imu']
        
        print(f"Found {len(reader.connections)} connections (topics) in the bag.")
        for connection in reader.connections:
            print(f"  Topic: {connection.topic}, Type: {connection.msgtype}")

        message_count = 0
        for connection, timestamp, rawdata in reader.messages():
            # Deserialize the message data
            msg = reader.deserialize(rawdata, connection.msgtype)
            # Access message fields (example for a common message type)
            # This assumes your message type has a 'header' and 'frame_id'
            # Adapt based on the actual message types in your bag.
            try:
                if hasattr(msg, 'header') and hasattr(msg.header, 'frame_id'):
                    print(f"[{timestamp}] Topic: {connection.topic}, Frame ID: {msg.header.frame_id}")
                else:
                    print(f"[{timestamp}] Topic: {connection.topic}, Message: {msg}")
            except AttributeError:
                print(f"[{timestamp}] Topic: {connection.topic}, Message: {msg}") # Fallback for messages without header
            message_count += 1
            if message_count >= 10: # Limit output for brevity
                print("... (truncated after 10 messages)")
                break
except FileNotFoundError:
    print(f"Error: Bag file or directory not found at {bagpath}. Please create or specify a valid path.")
except Exception as e:
    print(f"An error occurred: {e}")

view raw JSON →