aiochannel

1.3.0 · active · verified Thu Apr 16

aiochannel provides asyncio-compatible channels (closable queues) inspired by Go's concurrency model. It extends the functionality of `asyncio.Queue` by introducing a concept of a channel being 'closed and drained', guaranteeing that no further items can be added once closed. The library is active, with the current version being 1.3.0, and typically sees releases as needed for fixes or minor enhancements.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates creating an `aiochannel.Channel`, with a producer coroutine putting items and a consumer coroutine retrieving them using an `async for` loop. The producer closes the channel, and the `channel.join()` call ensures the main program waits until all items are consumed and the channel is fully drained.

import asyncio
from aiochannel import Channel, ChannelClosed

async def producer(ch: Channel, num_items: int):
    for i in range(num_items):
        await ch.put(f"item-{i}")
        print(f"Produced item-{i}")
    ch.close()
    print("Producer closed the channel")

async def consumer(ch: Channel):
    try:
        async for item in ch:
            print(f"Consumed {item}")
    except ChannelClosed:
        print("Consumer detected channel closed and drained.")

async def main():
    channel = Channel(10) # Create a channel with a buffer size of 10
    await asyncio.gather(
        producer(channel, 5),
        consumer(channel)
    )
    # .join() waits until the channel is both closed and drained
    await channel.join()
    print("Channel is closed and drained, main exiting.")

if __name__ == "__main__":
    asyncio.run(main())

view raw JSON →