aeventkit: Event-Driven Data Pipelines

raw JSON →
2.1.0 verified Fri Apr 17 auth: no python

aeventkit is an active Python library (current version 2.1.0) for building event-driven data pipelines. It leverages Pydantic v2 for robust data validation and AnyIO for asynchronous execution, making it suitable for modern, high-performance applications. The library focuses on clear separation of concerns with publishers, subscribers, and an event bus.

pip install aeventkit
error RuntimeError: Event loop is already running
cause Attempting to call `anyio.run()` or `asyncio.run()` when an event loop is already active, for example, in a Jupyter notebook or if `anyio.run()` is called multiple times.
fix
Ensure anyio.run() is called only once at the root of your application. If integrating with an existing async application, directly await your main function rather than calling anyio.run().
error pydantic.v1_error.PydanticImportError: Pydantic V2 is not installed. aeventkit requires pydantic >= 2.0.0.
cause The installed version of Pydantic is v1.x, or Pydantic v2 is not correctly installed/accessible.
fix
Upgrade Pydantic: pip install 'pydantic>=2.0.0' --upgrade. If issues persist, check your environment for conflicting Pydantic versions from other dependencies.
error AttributeError: 'NoneType' object has no attribute 'publish'
cause An `EventPublisher` instance was created but not registered with an `EventBus` before attempting to publish an event.
fix
Before publishing, ensure you have called bus.register_publisher(your_publisher_instance) where bus is your EventBus instance.
gotcha aeventkit is designed for asynchronous operations using AnyIO. All core components and interaction methods are `async`/`await` compatible. Attempting to call async methods from synchronous code without an event loop or proper execution context will result in errors.
fix Ensure your application's entry point uses `anyio.run()` or integrates within an existing `asyncio` or `anyio` event loop. All event handlers and interactions should be `async` functions.
breaking aeventkit strictly requires Pydantic v2 (>=2.0.0). Projects using Pydantic v1 will encounter `PydanticImportError` or validation issues due to API changes between Pydantic versions.
fix Upgrade Pydantic to version 2 or higher (`pip install 'pydantic>=2.0.0' --upgrade`). Verify no other project dependencies are pinning Pydantic to v1.
gotcha Publishers and Subscribers must be registered with an `EventBus` instance to communicate. Creating an `EventPublisher` or `EventSubscriber` alone does not make them active participants in the event stream.
fix Always call `bus.register_publisher(publisher_instance)` and `bus.register_subscriber(subscriber_instance)` after initializing your bus and components.

This example demonstrates how to define a custom Event, create a Publisher and Subscriber, register them with an EventBus, and publish an event. It uses `anyio.run()` to execute the asynchronous main function, which is the standard entry point for aeventkit applications.

import anyio
from aeventkit import Event, EventBus, EventSubscriber, EventPublisher
from pydantic import Field

class MyEvent(Event):
    message: str = Field(..., description="A simple test message")

class MySubscriber(EventSubscriber):
    async def handle(self, event: MyEvent):
        print(f"Subscriber received: {event.message}")

async def main():
    bus = EventBus()
    subscriber = MySubscriber()
    publisher = EventPublisher()

    bus.register_subscriber(subscriber)
    bus.register_publisher(publisher)

    print("Publishing event...")
    await publisher.publish(MyEvent(message="Hello, aeventkit!"))
    await anyio.sleep(0.1) # Give subscriber a moment to process
    print("Event published and potentially handled.")

if __name__ == "__main__":
    anyio.run(main)