SignalR Core Python Client

1.0.2 · active · verified Thu Apr 09

signalrcore is a full-featured Python client for SignalR Core hubs, offering support for various transports (WebSockets, Server-Sent Events, Long Polling) and encodings (JSON, MessagePack). It's designed to be compatible with Azure SignalR Service and serverless functions, and includes robust automatic and manual reconnection capabilities. The library abstracts away the complexities of SignalR protocol negotiation, transport fallback, and message dispatching, allowing developers to focus on defining event callbacks. Currently at version 1.0.2, it receives active development and maintenance.

Warnings

Install

Imports

Quickstart

This example demonstrates how to establish a connection to a SignalR hub, set up event handlers for connection status, register a callback for a server-side method, and send messages. It includes basic logging configuration and automatic reconnection, and uses an environment variable for the hub URL for better configurability. Remember to replace `http://localhost:5000/hub` with your actual SignalR hub URL.

import logging
import time
import os
from signalrcore.hub_connection_builder import HubConnectionBuilder

# Configure logging for better visibility
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)

hub_connection = HubConnectionBuilder() \
    .with_url(os.environ.get('SIGNALR_HUB_URL', 'http://localhost:5000/hub')) \
    .configure_logging(logging.DEBUG, socket_trace=True, handler_mapping={
        "httpx": logging.WARNING,
        "websocket": logging.WARNING,
        "urllib3": logging.WARNING
    }) \
    .with_automatic_reconnect({
        "type": "interval",
        "keep_alive_interval": 10,
        "intervals": [1, 2, 5, 10, 15, 30] # In seconds
    }) \
    .build()

hub_connection.on_open(lambda: print("Connection opened."))
hub_connection.on_close(lambda: print("Connection closed."))
hub_connection.on_error(lambda data: print(f"Connection error: {data.error}"))

# Define a callback for a specific method on the hub
def receive_message(args):
    print(f"Received message: {args}")

hub_connection.on("ReceiveMessage", receive_message)

print("Starting connection...")
hub_connection.start()
print("Connection started.")

# Keep the connection alive for some time and send a message
try:
    # Give some time for the connection to establish and possibly auto-reconnect
    time.sleep(5)
    if hub_connection.connected:
        print("Sending message to hub...")
        hub_connection.send("SendMessage", ["PythonClient", "Hello from Python!"])
        print("Message sent. Waiting for 30 seconds to receive messages or keep alive.")
        time.sleep(30) # Keep connection alive and listen for messages
    else:
        print("Connection not established or reconnected. Cannot send message.")

except KeyboardInterrupt:
    pass
finally:
    print("Stopping connection...")
    hub_connection.stop()
    print("Connection stopped.")

view raw JSON →