aiochannel

raw JSON →
1.3.0 verified Thu Apr 16 auth: no python

aiochannel provides asyncio-compatible channels (closable queues) inspired by Go's concurrency model. It extends the functionality of `asyncio.Queue` by introducing a concept of a channel being 'closed and drained', guaranteeing that no further items can be added once closed. The library is active, with the current version being 1.3.0, and typically sees releases as needed for fixes or minor enhancements.

pip install aiochannel
error TypeError: object Channel cannot be awaited
cause Attempting to call an asynchronous method (e.g., `put`, `get`) on a Channel object without `await` inside an `async def` function, or calling an `async def` function as if it were a regular function.
fix
Ensure all calls to async methods or async def functions are prefixed with await. For top-level execution, use asyncio.run().
error aiochannel.ChannelClosed: Channel is closed
cause Attempting to `put()` an item into a channel after `channel.close()` has been called, or attempting to `get()` from a channel that has been closed and is already empty.
fix
Design your producer logic to close the channel only when no more items will be sent. Consumers should handle the ChannelClosed exception, often by breaking out of a loop, or use async for which handles this implicitly.
gotcha Once an `aiochannel.Channel` is closed using `channel.close()`, it is a permanent state and cannot be reopened. Subsequent calls to `put()` will raise `ChannelClosed`.
fix Design your application logic so that channel closure is an irreversible signal for no more data. If a channel needs to be 'reset', create a new `Channel` instance.
gotcha The `channel.join()` method in `aiochannel.Channel` behaves differently from `asyncio.Queue.join()`. `aiochannel.Channel.join()` waits until the channel is *both* closed (`.close()` has been called) and completely drained (all items have been retrieved). `asyncio.Queue.join()` only waits until the queue is empty.
fix Always use `channel.join()` when you need to ensure all processing is complete and the channel is empty, after calling `channel.close()`. Do not rely on `channel.empty()` alone for this purpose if `close()` is involved.
gotcha `aiochannel.Channel` does not implement `task_done()` or rely on it for its `join()` semantics, unlike `asyncio.Queue`. The completion signal for `aiochannel` is its `closed and drained` state.
fix Do not attempt to call `task_done()` on an `aiochannel.Channel` instance as it is not part of its API. Rely on `channel.close()` and `channel.join()` for managing completion.
deprecated Explicitly passing an `asyncio` event loop instance (e.g., `Channel(loop=my_loop)`) to the `Channel` constructor is generally discouraged as `asyncio` itself is phasing out explicit loop passing in favor of `asyncio.get_running_loop()`.
fix Omit the `loop` argument when creating a `Channel` instance. `aiochannel` will automatically use the currently running event loop.

This quickstart demonstrates creating an `aiochannel.Channel`, with a producer coroutine putting items and a consumer coroutine retrieving them using an `async for` loop. The producer closes the channel, and the `channel.join()` call ensures the main program waits until all items are consumed and the channel is fully drained.

import asyncio
from aiochannel import Channel, ChannelClosed

async def producer(ch: Channel, num_items: int):
    for i in range(num_items):
        await ch.put(f"item-{i}")
        print(f"Produced item-{i}")
    ch.close()
    print("Producer closed the channel")

async def consumer(ch: Channel):
    try:
        async for item in ch:
            print(f"Consumed {item}")
    except ChannelClosed:
        print("Consumer detected channel closed and drained.")

async def main():
    channel = Channel(10) # Create a channel with a buffer size of 10
    await asyncio.gather(
        producer(channel, 5),
        consumer(channel)
    )
    # .join() waits until the channel is both closed and drained
    await channel.join()
    print("Channel is closed and drained, main exiting.")

if __name__ == "__main__":
    asyncio.run(main())