Bubus (Python 3.10+ Fork)

1.3.0 · active · verified Thu Apr 16

Bubus-py310x is a modified fork of the `bubus` event bus library, specifically designed to support Python 3.10 by replacing usages of `asyncio.timeout` (introduced in Python 3.11) with compatible alternatives. It provides a Pydantic-powered, fully-featured event bus for building asynchronous Python applications, supporting type-safe publish-subscribe patterns with async/sync handler support and advanced features like event forwarding and concurrency control. The current version is 1.3.0, and it maintains an active release cadence to ensure Python 3.10 compatibility.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to set up an `EventBus`, define `BaseEvent` models with Pydantic, register asynchronous handlers, and dispatch/expect events. It showcases nested event handling and the use of `bus.expect` with a timeout, which is made compatible with Python 3.10 by this fork.

import asyncio
from bubus import EventBus, BaseEvent
from pydantic import BaseModel

class UserLoginEvent(BaseEvent[str]):
    username: str
    is_admin: bool

# Dummy for demonstration, replace with actual API call if needed
class AuthRequestEvent(BaseEvent[None]):
    pass

class AuthResponseEvent(BaseEvent[str]):
    message: str

class AuthAPI:
    @staticmethod
    async def post(event: AuthRequestEvent):
        print("AuthAPI received AuthRequestEvent. Dispatching AuthResponseEvent.")
        await asyncio.sleep(0.1) # Simulate async work
        event.event_bus.dispatch(AuthResponseEvent(message="Auth successful!"))


async def handle_login(event: UserLoginEvent) -> str:
    print(f"Handler for UserLoginEvent received: {event.username}, Admin: {event.is_admin}")
    auth_request = await event.event_bus.dispatch(AuthRequestEvent()) # nested events supported
    auth_response = await event.event_bus.expect(AuthResponseEvent, timeout=30.0) # Uses 3.10 compatible timeout
    return f"User {event.username} logged in admin={event.is_admin} with API response: {auth_response.message}"

async def main():
    bus = EventBus()
    bus.on(UserLoginEvent, handle_login)
    bus.on(AuthRequestEvent, AuthAPI.post)

    print("Dispatching UserLoginEvent...")
    event_result = await bus.dispatch(UserLoginEvent(username="alice", is_admin=True)).event_result()
    print(f"Final result: {event_result}")

if __name__ == "__main__":
    asyncio.run(main())

view raw JSON →