Foxglove Python SDK
The Foxglove Python SDK provides tools for streaming data (e.g., sensor data, robot state, custom messages) from Python applications to Foxglove Studio for visualization and debugging. It's actively maintained with regular releases, currently at version 0.21.0, targeting Python 3.10 and newer, leveraging asynchronous operations for efficient data transfer.
Common errors
-
ImportError: cannot import name 'Publisher' from 'foxglove_sdk'
cause Attempting to import the `Publisher` class, which was removed in `foxglove-sdk` v0.20.0.fixThe `Publisher` class was removed. Directly use `client.send_message()` after creating a `FoxgloveClient` instance. Remove the `Publisher` import and any `client.publisher` calls. -
ImportError: cannot import name 'Client' from 'foxglove_sdk.client'
cause Attempting to import the `Client` class, which was renamed to `FoxgloveClient` and moved to the top-level package in v0.20.0.fixUpdate the import statement to `from foxglove_sdk import FoxgloveClient`. -
RuntimeError: Event loop is already running
cause Attempting to run `asyncio.run()` when an `asyncio` event loop is already active in the current thread (e.g., in a Jupyter notebook or another async context).fixIf running in an interactive environment or existing event loop, get the current loop and create tasks directly, e.g., `loop = asyncio.get_event_loop(); loop.create_task(main())`. Avoid calling `asyncio.run()` more than once in the same thread, or ensure it's only called when no loop is running. -
foxglove_sdk.client.exceptions.ClientError: Failed to register stream
cause The WebSocket server rejected the stream registration, often due to an invalid or malformed schema, or a topic name conflict.fixCarefully review the `schemaName` and `schema` provided in your `AddStreamRequest`. Ensure the `schema` is valid JSON (or Protobuf schema string) and correctly represents the data you intend to send. Check server logs for more specific errors.
Warnings
- breaking The `Publisher` class was removed and the main client class was renamed from `Client` to `FoxgloveClient` in version 0.20.0. The API was simplified for direct message sending.
- breaking The term 'Channel' was renamed to 'Stream' across the entire API in version 0.18.0. This affects class names (e.g., `AddChannelRequest` to `AddStreamRequest`) and method parameters.
- gotcha The Foxglove SDK is built on `asyncio`. All network operations are asynchronous and must be `await`ed within an `async` function. Running synchronous code or attempting to call async methods without `await` will result in `RuntimeError` or unexpected behavior.
- gotcha Data payloads sent via `send_message` must be bytes, not raw Python dicts or strings. The schema defined during `add_stream` must precisely match the structure of the data you send.
Install
-
pip install foxglove-sdk
Imports
- FoxgloveClient
from foxglove_sdk.client import Client
from foxglove_sdk import FoxgloveClient
- AddStreamRequest
from foxglove_sdk.client import AddChannelRequest
from foxglove_sdk.client import AddStreamRequest
- MessageEvent
from foxglove_sdk.base import MessageEvent
Quickstart
import asyncio
import os
import time
import json
from foxglove_sdk import FoxgloveClient
from foxglove_sdk.client import AddStreamRequest, ClientError
from foxglove_sdk.base import MessageEvent
async def main():
# Connect to a local Foxglove WebSocket server (e.g., `npx @foxglove/ws-server`)
# or a Foxglove Cloud stream endpoint.
websocket_url = os.environ.get("FOXGLOVE_WEBSOCKET_URL", "ws://localhost:8765/")
print(f"Attempting to connect to Foxglove WebSocket at {websocket_url}")
client = FoxgloveClient(websocket_url)
try:
await client.connect()
print("Successfully connected to Foxglove WebSocket.")
# Define a simple JSON schema for a counter stream
stream_id = await client.add_stream(
AddStreamRequest(
topic="my_counter",
encoding="json",
schemaName="example.Counter",
schema=json.dumps({
"type": "object",
"properties": {
"count": {"type": "integer"},
"timestamp": {"type": "number"}
},
"required": ["count", "timestamp"]
})
)
)
print(f"Stream 'my_counter' (ID: {stream_id}) added.")
# Publish 10 counter messages
for i in range(10):
current_time_ns = time.time_ns() # Timestamp in nanoseconds
message_payload = {"count": i, "timestamp": current_time_ns / 1e9}
await client.send_message(
MessageEvent(
timestamp=current_time_ns,
streamId=stream_id,
payload=json.dumps(message_payload).encode("utf8") # Payload must be bytes
)
)
print(f"Published message {i}: {message_payload}")
await asyncio.sleep(1) # Wait for 1 second before next message
except ClientError as e:
print(f"Foxglove Client Error: {e}")
print("Please ensure a Foxglove WebSocket server is running and accessible (e.g., `npx @foxglove/ws-server`).")
except Exception as e:
print(f"An unexpected error occurred: {e}")
finally:
if client.connected:
await client.disconnect()
print("Disconnected from Foxglove WebSocket.")
if __name__ == "__main__":
asyncio.run(main())