httpx-sse
httpx-sse is a Python library designed to consume Server-Sent Event (SSE) messages, integrating seamlessly with the HTTPX client. It provides both synchronous and asynchronous helpers to connect to SSE endpoints and iterate over the received events. Currently at version 0.4.3, the library is actively maintained with regular updates and fixes.
Warnings
- breaking Python 3.7 support was officially dropped as it reached End-of-Life.
- gotcha The timing of `SSEError` (for non `text/event-stream` Content-Type) was moved from `connect_sse()` to `iter_sse()`/`aiter_sse()`. This allows inspecting the raw response before content type validation.
- gotcha Incorrect newline parsing that was not compliant with the SSE specification was fixed. If you were working around previous parsing quirks, your code might need adjustment.
- gotcha A performance issue was introduced in `0.4.2` due to improved line parsing, which was subsequently fixed.
Install
-
pip install httpx-sse
Imports
- connect_sse
from httpx_sse import connect_sse
- aconnect_sse
from httpx_sse import aconnect_sse
- ServerSentEvent
from httpx_sse import ServerSentEvent
Quickstart
import asyncio
import httpx
from httpx_sse import aconnect_sse, ServerSentEvent
import os
async def consume_sse_stream(url: str):
# Ensure a long enough timeout for SSE streams, or set to None
# if the stream is expected to be indefinite.
# httpx.AsyncClient(timeout=None) or httpx.Timeout(60.0, connect=5.0)
async with httpx.AsyncClient(timeout=None) as client:
try:
async with aconnect_sse(client, "GET", url) as event_source:
async for sse in event_source.aiter_sse():
print(f"Event: {sse.event}, Data: {sse.data}, ID: {sse.id}, Retry: {sse.retry}")
if sse.event == "end":
break
except httpx.RequestError as exc:
print(f"An error occurred while requesting {exc.request.url!r}: {exc}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
# Example usage (replace with your actual SSE endpoint)
# For local testing, you might run a simple SSE server.
SSE_ENDPOINT_URL = os.environ.get("SSE_TEST_URL", "http://localhost:8000/sse")
if __name__ == "__main__":
print(f"Connecting to SSE endpoint: {SSE_ENDPOINT_URL}")
asyncio.run(consume_sse_stream(SSE_ENDPOINT_URL))