MAVLink Protocol Library for Python

2.4.49 · active · verified Tue Apr 14

pymavlink is the official Python MAVLink (Micro Air Vehicle Link) protocol implementation. It provides tools for parsing, generating, and communicating MAVLink messages, enabling Python applications to interact with drones, autopilots, and ground control stations. The library is actively maintained by the ArduPilot community and is currently at version 2.4.49, with frequent updates to add features and fix bugs.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to establish a MAVLink connection, request a data stream, and receive and process messages from a MAVLink-enabled device (e.g., a drone autopilot or a SITL simulator). It connects to a configurable MAVLink endpoint, requests all data streams, and then listens for and prints HEARTBEAT and ATTITUDE messages for 10 seconds. Ensure a MAVLink source is available at the specified connection string for the script to function.

import os
import time
from pymavlink import mavutil

# Define the connection string for your MAVLink device
# Examples: 'udp:127.0.0.1:14550' (for SITL), 'serial:/dev/ttyACM0:115200' (for a USB serial device)
# You can set the MAVLINK_CONNECTION_STRING environment variable or change this directly.
connection_string = os.environ.get('MAVLINK_CONNECTION_STRING', 'udp:127.0.0.1:14550')

print(f"Attempting to connect to MAVLink device via: {connection_string}...")

try:
    # Establish a MAVLink connection. wait_ready=True blocks until the first HEARTBEAT is received.
    master = mavutil.mavlink_connection(connection_string, baud=115200, wait_ready=True)
    print(f"Connection established! System ID: {master.target_system}, Component ID: {master.target_component}")

    # Request an ATTITUDE data stream at 1Hz
    # MAV_DATA_STREAM_ALL for all streams, or specific ones like MAV_DATA_STREAM_EXTRA1 for ATTITUDE
    # The last '1' is the rate in Hz, '0' would stop the stream.
    master.mav.request_data_stream_send(
        master.target_system, master.target_component, 
        mavutil.mavlink.MAV_DATA_STREAM_ALL, 1, 1
    )
    print("Requested data stream for all messages at 1Hz.")

    print("Listening for HEARTBEAT and ATTITUDE messages for 10 seconds...")
    start_time = time.time()
    while (time.time() - start_time) < 10: # Listen for 10 seconds
        # Non-blocking receive for specific message types
        msg = master.recv_match(type=['HEARTBEAT', 'ATTITUDE'], blocking=False, timeout=0.1)
        if msg:
            print(f"Received {msg.get_type()} - {msg}")
            if msg.get_type() == 'ATTITUDE':
                print(f"  Roll: {msg.roll:.2f} deg, Pitch: {msg.pitch:.2f} deg, Yaw: {msg.yaw:.2f} deg")
        time.sleep(0.01) # Small delay to prevent busy-waiting

    print("Finished listening for messages.")

except Exception as e:
    print(f"Error during MAVLink communication: {e}")

view raw JSON →