aiochannel
aiochannel provides asyncio-compatible channels (closable queues) inspired by Go's concurrency model. It extends the functionality of `asyncio.Queue` by introducing a concept of a channel being 'closed and drained', guaranteeing that no further items can be added once closed. The library is active, with the current version being 1.3.0, and typically sees releases as needed for fixes or minor enhancements.
Common errors
-
TypeError: object Channel cannot be awaited
cause Attempting to call an asynchronous method (e.g., `put`, `get`) on a Channel object without `await` inside an `async def` function, or calling an `async def` function as if it were a regular function.fixEnsure all calls to `async` methods or `async def` functions are prefixed with `await`. For top-level execution, use `asyncio.run()`. -
aiochannel.ChannelClosed: Channel is closed
cause Attempting to `put()` an item into a channel after `channel.close()` has been called, or attempting to `get()` from a channel that has been closed and is already empty.fixDesign your producer logic to close the channel only when no more items will be sent. Consumers should handle the `ChannelClosed` exception, often by breaking out of a loop, or use `async for` which handles this implicitly.
Warnings
- gotcha Once an `aiochannel.Channel` is closed using `channel.close()`, it is a permanent state and cannot be reopened. Subsequent calls to `put()` will raise `ChannelClosed`.
- gotcha The `channel.join()` method in `aiochannel.Channel` behaves differently from `asyncio.Queue.join()`. `aiochannel.Channel.join()` waits until the channel is *both* closed (`.close()` has been called) and completely drained (all items have been retrieved). `asyncio.Queue.join()` only waits until the queue is empty.
- gotcha `aiochannel.Channel` does not implement `task_done()` or rely on it for its `join()` semantics, unlike `asyncio.Queue`. The completion signal for `aiochannel` is its `closed and drained` state.
- deprecated Explicitly passing an `asyncio` event loop instance (e.g., `Channel(loop=my_loop)`) to the `Channel` constructor is generally discouraged as `asyncio` itself is phasing out explicit loop passing in favor of `asyncio.get_running_loop()`.
Install
-
pip install aiochannel
Imports
- Channel
from aiochannel import Channel
Quickstart
import asyncio
from aiochannel import Channel, ChannelClosed
async def producer(ch: Channel, num_items: int):
for i in range(num_items):
await ch.put(f"item-{i}")
print(f"Produced item-{i}")
ch.close()
print("Producer closed the channel")
async def consumer(ch: Channel):
try:
async for item in ch:
print(f"Consumed {item}")
except ChannelClosed:
print("Consumer detected channel closed and drained.")
async def main():
channel = Channel(10) # Create a channel with a buffer size of 10
await asyncio.gather(
producer(channel, 5),
consumer(channel)
)
# .join() waits until the channel is both closed and drained
await channel.join()
print("Channel is closed and drained, main exiting.")
if __name__ == "__main__":
asyncio.run(main())