Event-driven Data Pipelines
Eventkit is a Python library designed for creating event-driven data pipelines and enabling communication between loosely coupled components. It leverages Python's `asyncio` for seamless integration with asynchronous programming patterns. The library's interface is kept Pythonic, using familiar names and idioms. The current version is 1.0.3, with releases tied to development cycles rather than a fixed cadence, though updates appear infrequent based on PyPI history.
Common errors
-
RuntimeError: Cannot run the event loop while another loop is running
cause Attempting to run an asyncio-based `eventkit` operation (e.g., `event.run()`, `ev.Timer().run()`) in an environment like a Jupyter notebook where an asyncio event loop is already active.fixApply `nest_asyncio` at the start of your script or notebook: `import nest_asyncio; nest_asyncio.apply()`. Or, if awaiting a single result in Jupyter, use `await` directly on the callable (e.g., `await event.list()`). -
AttributeError: module 'eventkit' has no attribute 'Event'
cause This error typically occurs if you import `eventkit` without an alias, or attempt to access submodules incorrectly.fixThe standard and recommended way to import is `import eventkit as ev`. Then access the `Event` class as `ev.Event()`. Other core classes like `Sequence`, `Range`, `Zip` are also accessed via `ev.Sequence()`, `ev.Range()`, `ev.Zip()`.
Warnings
- gotcha When running `eventkit`'s asynchronous operations (e.g., `event.run()` for async pipelines, `ev.Timer().run()`) directly within a Jupyter notebook cell, you might encounter a `RuntimeError` if the asyncio event loop is already active.
- gotcha The package name 'eventkit' is shared by other, unrelated Python libraries (e.g., `pyobjc-framework-EventKit` for macOS calendar access, `outcome-eventkit` for CloudEvents). Ensure you are installing and using the correct library by `erdewit` for event-driven data pipelines.
Install
-
pip install eventkit
Imports
- Event
import eventkit as ev event = ev.Event()
- Sequence
import eventkit as ev pipeline = ev.Sequence(...)
Quickstart
import eventkit as ev
def my_listener1(value):
print(f"Listener 1 received: {value}")
def my_listener2(value):
print(f"Listener 2 processed: {value * 2}")
# Create an event
my_event = ev.Event()
# Connect listeners to the event
my_event += my_listener1
my_event += my_listener2
# Emit a value, triggering all connected listeners
my_event.emit(5)
my_event.emit(10)
# You can also use method chaining for pipelines
pipeline_event = (
ev.Sequence([1, 2, 3])
.map(lambda x: x * 10)
.enumerate()
)
# Run a synchronous pipeline
print("Pipeline output:", pipeline_event.run())