janus
janus is a Python library providing mixed synchronous and asynchronous queues to facilitate communication between classic threaded code and asyncio tasks. It offers `Queue`, `LifoQueue`, and `PriorityQueue` implementations, each with distinct synchronous (`.sync_q`) and asynchronous (`.async_q`) interfaces. The current version is 2.0.0, and the library is actively maintained to support the latest Python versions.
Warnings
- breaking In v2.0.0, the `shutdown()` method's error handling changed. Calling `shutdown()` on a closed queue now raises `janus.AsyncQueueShutDown` or `janus.SyncQueueShutDown` instead of `RuntimeError`. Additionally, `task_done()` and `join()` methods no longer raise exceptions on queue shutdown/closing, aligning with stdlib queue behavior. This may require updating exception handling logic. [cite: Release Notes]
- breaking Version 1.1.0 dropped support for Python 3.7 and 3.8. The library now requires Python 3.9 or higher. [cite: Release Notes]
- breaking As of v0.5.0, explicit `loop` arguments were removed from `janus.Queue()` instantiation, and it became forbidden to create queues outside an active asyncio event loop. The library now automatically uses `asyncio.get_running_loop()`. [cite: Release Notes]
- gotcha It is crucial to call `await queue.aclose()` when you are finished with a `janus.Queue`. Failure to do so can lead to `asyncio` generating error messages or resource leaks, as the library creates internal tasks to manage notifications.
- gotcha janus queues are specifically designed for interoperation between synchronous threads and asynchronous asyncio tasks. For purely synchronous (thread-to-thread) or purely asynchronous (asyncio-to-asyncio) communication, using standard `queue.Queue` or `asyncio.Queue` respectively is recommended, as `janus` can introduce significant slowdowns in these single-mode scenarios.
- gotcha janus queues cannot be used for communication between two *different* asyncio event loops. Like other asyncio primitives, they are bound to the specific event loop in which they are created.
Install
-
pip install janus
Imports
- Queue
from janus import Queue
- LifoQueue
from janus import LifoQueue
- PriorityQueue
from janus import PriorityQueue
- SyncQueue
from janus import SyncQueue
- AsyncQueue
from janus import AsyncQueue
- SyncQueueEmpty
from janus import SyncQueueEmpty
- AsyncQueueEmpty
from janus import AsyncQueueEmpty
Quickstart
import asyncio
import threading
import janus
def threaded_producer(sync_q: janus.SyncQueue[int]) -> None:
print("Thread: Starting producer")
for i in range(5):
print(f"Thread: Putting {i}")
sync_q.put(i)
sync_q.join() # Wait for all items to be processed by async_coro
print("Thread: Producer finished and joined")
async def async_consumer(async_q: janus.AsyncQueue[int]) -> None:
print("Async: Starting consumer")
for _ in range(5):
val = await async_q.get()
print(f"Async: Got {val}")
async_q.task_done()
print("Async: Consumer finished")
async def main() -> None:
queue: janus.Queue[int] = janus.Queue()
loop = asyncio.get_running_loop()
# Run the synchronous producer in a separate thread
producer_thread = threading.Thread(target=threaded_producer, args=(queue.sync_q,))
producer_thread.start()
# Run the asynchronous consumer
await async_consumer(queue.async_q)
producer_thread.join() # Ensure thread completes before closing queue
await queue.aclose() # Crucial for proper shutdown of janus resources
print("Main: Queue closed")
if __name__ == '__main__':
asyncio.run(main())