Bubus Event Bus
Bubus is an advanced, Pydantic-powered asynchronous event bus library for Python. It facilitates decoupled communication between components by allowing them to publish and subscribe to events. It's designed for modern Python applications, emphasizing type safety, performance, and robust error handling. The current version is 1.5.6, and it receives regular updates.
Warnings
- breaking The environment variable for setting the logging level was renamed from `BUBUS_LOG_LEVEL` to `BUBUS_LOGGING_LEVEL`.
- breaking As of version 1.5.6, `bubus` now raises original exceptions directly from event handlers, rather than potentially wrapping or suppressing them in certain scenarios.
- gotcha To enforce return types for event handlers and utilize the new return type enforcement feature, `BaseEvent` should be defined as a Generic, e.g., `BaseEvent[ReturnType]`.
- gotcha When using `bubus` in multiprocessing environments, ensure you are on a recent version (>=1.4.7) to benefit from fixes related to semaphore handling and avoid potential issues with stale event loops or deleted semaphores.
Install
-
pip install bubus
Imports
- EventBus
from bubus import EventBus
- BaseEvent
from bubus import BaseEvent
Quickstart
import asyncio
from bubus import EventBus, BaseEvent
class MyEvent(BaseEvent):
message: str
async def my_handler(event: MyEvent) -> None:
print(f"Received event: {event.message}")
await asyncio.sleep(0.01) # Simulate async work
async def main():
bus = EventBus()
bus.subscribe(MyEvent, my_handler)
print("Publishing event...")
await bus.publish(MyEvent(message="Hello from Bubus!"))
# Wait for all published events to be handled
await bus.wait_for_all_handlers()
await bus.stop() # Gracefully shut down the event bus
print("Bus stopped.")
if __name__ == "__main__":
asyncio.run(main())