aiochannel
raw JSON → 1.3.0 verified Thu Apr 16 auth: no python
aiochannel provides asyncio-compatible channels (closable queues) inspired by Go's concurrency model. It extends the functionality of `asyncio.Queue` by introducing a concept of a channel being 'closed and drained', guaranteeing that no further items can be added once closed. The library is active, with the current version being 1.3.0, and typically sees releases as needed for fixes or minor enhancements.
pip install aiochannel Common errors
error TypeError: object Channel cannot be awaited ↓
cause Attempting to call an asynchronous method (e.g., `put`, `get`) on a Channel object without `await` inside an `async def` function, or calling an `async def` function as if it were a regular function.
fix
Ensure all calls to
async methods or async def functions are prefixed with await. For top-level execution, use asyncio.run(). error aiochannel.ChannelClosed: Channel is closed ↓
cause Attempting to `put()` an item into a channel after `channel.close()` has been called, or attempting to `get()` from a channel that has been closed and is already empty.
fix
Design your producer logic to close the channel only when no more items will be sent. Consumers should handle the
ChannelClosed exception, often by breaking out of a loop, or use async for which handles this implicitly. Warnings
gotcha Once an `aiochannel.Channel` is closed using `channel.close()`, it is a permanent state and cannot be reopened. Subsequent calls to `put()` will raise `ChannelClosed`. ↓
fix Design your application logic so that channel closure is an irreversible signal for no more data. If a channel needs to be 'reset', create a new `Channel` instance.
gotcha The `channel.join()` method in `aiochannel.Channel` behaves differently from `asyncio.Queue.join()`. `aiochannel.Channel.join()` waits until the channel is *both* closed (`.close()` has been called) and completely drained (all items have been retrieved). `asyncio.Queue.join()` only waits until the queue is empty. ↓
fix Always use `channel.join()` when you need to ensure all processing is complete and the channel is empty, after calling `channel.close()`. Do not rely on `channel.empty()` alone for this purpose if `close()` is involved.
gotcha `aiochannel.Channel` does not implement `task_done()` or rely on it for its `join()` semantics, unlike `asyncio.Queue`. The completion signal for `aiochannel` is its `closed and drained` state. ↓
fix Do not attempt to call `task_done()` on an `aiochannel.Channel` instance as it is not part of its API. Rely on `channel.close()` and `channel.join()` for managing completion.
deprecated Explicitly passing an `asyncio` event loop instance (e.g., `Channel(loop=my_loop)`) to the `Channel` constructor is generally discouraged as `asyncio` itself is phasing out explicit loop passing in favor of `asyncio.get_running_loop()`. ↓
fix Omit the `loop` argument when creating a `Channel` instance. `aiochannel` will automatically use the currently running event loop.
Imports
- Channel
from aiochannel import Channel
Quickstart
import asyncio
from aiochannel import Channel, ChannelClosed
async def producer(ch: Channel, num_items: int):
for i in range(num_items):
await ch.put(f"item-{i}")
print(f"Produced item-{i}")
ch.close()
print("Producer closed the channel")
async def consumer(ch: Channel):
try:
async for item in ch:
print(f"Consumed {item}")
except ChannelClosed:
print("Consumer detected channel closed and drained.")
async def main():
channel = Channel(10) # Create a channel with a buffer size of 10
await asyncio.gather(
producer(channel, 5),
consumer(channel)
)
# .join() waits until the channel is both closed and drained
await channel.join()
print("Channel is closed and drained, main exiting.")
if __name__ == "__main__":
asyncio.run(main())