Foxglove Python SDK

0.21.0 · active · verified Thu Apr 16

The Foxglove Python SDK provides tools for streaming data (e.g., sensor data, robot state, custom messages) from Python applications to Foxglove Studio for visualization and debugging. It's actively maintained with regular releases, currently at version 0.21.0, targeting Python 3.10 and newer, leveraging asynchronous operations for efficient data transfer.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to connect to a Foxglove WebSocket server, define a new stream with a JSON schema, and publish messages asynchronously. Ensure you have a Foxglove WebSocket server running (e.g., locally via `npx @foxglove/ws-server` if you have Node.js installed, or connect to a cloud endpoint).

import asyncio
import os
import time
import json
from foxglove_sdk import FoxgloveClient
from foxglove_sdk.client import AddStreamRequest, ClientError
from foxglove_sdk.base import MessageEvent

async def main():
    # Connect to a local Foxglove WebSocket server (e.g., `npx @foxglove/ws-server`)
    # or a Foxglove Cloud stream endpoint.
    websocket_url = os.environ.get("FOXGLOVE_WEBSOCKET_URL", "ws://localhost:8765/")
    print(f"Attempting to connect to Foxglove WebSocket at {websocket_url}")

    client = FoxgloveClient(websocket_url)
    try:
        await client.connect()
        print("Successfully connected to Foxglove WebSocket.")

        # Define a simple JSON schema for a counter stream
        stream_id = await client.add_stream(
            AddStreamRequest(
                topic="my_counter",
                encoding="json",
                schemaName="example.Counter",
                schema=json.dumps({
                    "type": "object",
                    "properties": {
                        "count": {"type": "integer"},
                        "timestamp": {"type": "number"}
                    },
                    "required": ["count", "timestamp"]
                })
            )
        )
        print(f"Stream 'my_counter' (ID: {stream_id}) added.")

        # Publish 10 counter messages
        for i in range(10):
            current_time_ns = time.time_ns() # Timestamp in nanoseconds
            message_payload = {"count": i, "timestamp": current_time_ns / 1e9}
            await client.send_message(
                MessageEvent(
                    timestamp=current_time_ns,
                    streamId=stream_id,
                    payload=json.dumps(message_payload).encode("utf8") # Payload must be bytes
                )
            )
            print(f"Published message {i}: {message_payload}")
            await asyncio.sleep(1) # Wait for 1 second before next message

    except ClientError as e:
        print(f"Foxglove Client Error: {e}")
        print("Please ensure a Foxglove WebSocket server is running and accessible (e.g., `npx @foxglove/ws-server`).")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
    finally:
        if client.connected:
            await client.disconnect()
            print("Disconnected from Foxglove WebSocket.")

if __name__ == "__main__":
    asyncio.run(main())

view raw JSON →