Culsans - Thread-safe async-aware queue for Python

raw JSON →
0.11.0 verified Sat May 09 auth: no python

Culsans is a thread-safe async-aware queue for Python, providing a drop-in replacement for asyncio.Queue and janus.Queue with improved cancellation handling, phase-fair priority, and reentrant readers-writer lock (RWLock). It supports Python >=3.8 and is built on top of the aiologic library. Current version is 0.11.0, with a release cadence of approximately monthly updates.

pip install culsans
error from culsans.queue import Queue ModuleNotFoundError: No module named 'culsans.queue'
cause Queue is not in a submodule; it is directly exported from the top-level package.
fix
Use 'from culsans import Queue'
error AttributeError: module 'culsans' has no attribute 'LifoQueue'
cause LifoQueue was removed in version 0.7.0 in favor of general Queue with an optional 'maxsize' parameter and 'type' argument? Actually, LifoQueue is still present, but this error may occur if using an old version or incorrect import path.
fix
Ensure you have installed culsans >=0.7.0. Use 'from culsans import LifoQueue' if you need LIFO behavior.
breaking In version 0.9.0, checkpoint functions are now called before queue operations instead of after. This may increase the number of explicit context switches, affecting performance or behavior in tight loops.
fix If you rely on checkpoint timing, review your code for any assumptions about when checkpoints are called. Consider using synchronous methods if context switches are costly.
deprecated Python 3.8 support was added in 0.8.0, but Python 3.8 is itself end-of-life. Future versions may drop Python 3.8 support.
fix Upgrade to Python 3.9+ to ensure compatibility with future releases.
gotcha QueueShutDown is raised on various operations after close() or shutdown(), consistent with Python 3.13's queue shutdown. However, on Python <3.13, the exception is backported and may cause name conflicts with type checkers.
fix Use 'from culsans import QueueShutDown' to ensure compatibility across Python versions; avoid importing from the stdlib's asyncio.queues.
gotcha MixedQueue is a protocol, not a concrete class. Do not instantiate MixedQueue directly.
fix Use MixedQueue only for type annotations; use Queue, LifoQueue, PriorityQueue, etc. for instantiation.

Basic async producer-consumer example using Culsans Queue.

import asyncio
from culsans import Queue

async def producer(queue):
    for i in range(5):
        await queue.put(i)
        print(f'Produced {i}')

async def consumer(queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f'Consumed {item}')
        queue.task_done()

async def main():
    queue = Queue()
    prod = asyncio.create_task(producer(queue))
    cons = asyncio.create_task(consumer(queue))
    await prod
    await queue.put(None)  # sentinel
    await cons

asyncio.run(main())