httpx-sse

0.4.3 · active · verified Sat Mar 28

httpx-sse is a Python library designed to consume Server-Sent Event (SSE) messages, integrating seamlessly with the HTTPX client. It provides both synchronous and asynchronous helpers to connect to SSE endpoints and iterate over the received events. Currently at version 0.4.3, the library is actively maintained with regular updates and fixes.

Warnings

Install

Imports

Quickstart

Demonstrates how to establish an asynchronous connection to an SSE endpoint using `aconnect_sse` and iterate through incoming `ServerSentEvent` objects. It includes error handling and a common pattern for managing HTTPX client timeouts, which is crucial for long-lived SSE connections.

import asyncio
import httpx
from httpx_sse import aconnect_sse, ServerSentEvent
import os

async def consume_sse_stream(url: str):
    # Ensure a long enough timeout for SSE streams, or set to None
    # if the stream is expected to be indefinite.
    # httpx.AsyncClient(timeout=None) or httpx.Timeout(60.0, connect=5.0)
    async with httpx.AsyncClient(timeout=None) as client:
        try:
            async with aconnect_sse(client, "GET", url) as event_source:
                async for sse in event_source.aiter_sse():
                    print(f"Event: {sse.event}, Data: {sse.data}, ID: {sse.id}, Retry: {sse.retry}")
                    if sse.event == "end":
                        break
        except httpx.RequestError as exc:
            print(f"An error occurred while requesting {exc.request.url!r}: {exc}")
        except Exception as e:
            print(f"An unexpected error occurred: {e}")

# Example usage (replace with your actual SSE endpoint)
# For local testing, you might run a simple SSE server.
SSE_ENDPOINT_URL = os.environ.get("SSE_TEST_URL", "http://localhost:8000/sse")

if __name__ == "__main__":
    print(f"Connecting to SSE endpoint: {SSE_ENDPOINT_URL}")
    asyncio.run(consume_sse_stream(SSE_ENDPOINT_URL))

view raw JSON →