aeventkit: Event-Driven Data Pipelines
aeventkit is an active Python library (current version 2.1.0) for building event-driven data pipelines. It leverages Pydantic v2 for robust data validation and AnyIO for asynchronous execution, making it suitable for modern, high-performance applications. The library focuses on clear separation of concerns with publishers, subscribers, and an event bus.
Common errors
-
RuntimeError: Event loop is already running
cause Attempting to call `anyio.run()` or `asyncio.run()` when an event loop is already active, for example, in a Jupyter notebook or if `anyio.run()` is called multiple times.fixEnsure `anyio.run()` is called only once at the root of your application. If integrating with an existing async application, directly `await` your main function rather than calling `anyio.run()`. -
pydantic.v1_error.PydanticImportError: Pydantic V2 is not installed. aeventkit requires pydantic >= 2.0.0.
cause The installed version of Pydantic is v1.x, or Pydantic v2 is not correctly installed/accessible.fixUpgrade Pydantic: `pip install 'pydantic>=2.0.0' --upgrade`. If issues persist, check your environment for conflicting Pydantic versions from other dependencies. -
AttributeError: 'NoneType' object has no attribute 'publish'
cause An `EventPublisher` instance was created but not registered with an `EventBus` before attempting to publish an event.fixBefore publishing, ensure you have called `bus.register_publisher(your_publisher_instance)` where `bus` is your `EventBus` instance.
Warnings
- gotcha aeventkit is designed for asynchronous operations using AnyIO. All core components and interaction methods are `async`/`await` compatible. Attempting to call async methods from synchronous code without an event loop or proper execution context will result in errors.
- breaking aeventkit strictly requires Pydantic v2 (>=2.0.0). Projects using Pydantic v1 will encounter `PydanticImportError` or validation issues due to API changes between Pydantic versions.
- gotcha Publishers and Subscribers must be registered with an `EventBus` instance to communicate. Creating an `EventPublisher` or `EventSubscriber` alone does not make them active participants in the event stream.
Install
-
pip install aeventkit
Imports
- Event
from aeventkit import Event
- EventPublisher
from aeventkit import EventPublisher
- EventSubscriber
from aeventkit import EventSubscriber
- EventBus
from aeventkit import EventBus
Quickstart
import anyio
from aeventkit import Event, EventBus, EventSubscriber, EventPublisher
from pydantic import Field
class MyEvent(Event):
message: str = Field(..., description="A simple test message")
class MySubscriber(EventSubscriber):
async def handle(self, event: MyEvent):
print(f"Subscriber received: {event.message}")
async def main():
bus = EventBus()
subscriber = MySubscriber()
publisher = EventPublisher()
bus.register_subscriber(subscriber)
bus.register_publisher(publisher)
print("Publishing event...")
await publisher.publish(MyEvent(message="Hello, aeventkit!"))
await anyio.sleep(0.1) # Give subscriber a moment to process
print("Event published and potentially handled.")
if __name__ == "__main__":
anyio.run(main)