janus

2.0.0 · active · verified Sat Apr 11

janus is a Python library providing mixed synchronous and asynchronous queues to facilitate communication between classic threaded code and asyncio tasks. It offers `Queue`, `LifoQueue`, and `PriorityQueue` implementations, each with distinct synchronous (`.sync_q`) and asynchronous (`.async_q`) interfaces. The current version is 2.0.0, and the library is actively maintained to support the latest Python versions.

Warnings

Install

Imports

Quickstart

This example demonstrates how to use `janus.Queue` to allow a synchronous thread to put items into a queue while an asynchronous coroutine concurrently consumes them. It highlights the use of `.sync_q` and `.async_q` interfaces and the important `aclose()` call for cleanup.

import asyncio
import threading
import janus

def threaded_producer(sync_q: janus.SyncQueue[int]) -> None:
    print("Thread: Starting producer")
    for i in range(5):
        print(f"Thread: Putting {i}")
        sync_q.put(i)
    sync_q.join() # Wait for all items to be processed by async_coro
    print("Thread: Producer finished and joined")

async def async_consumer(async_q: janus.AsyncQueue[int]) -> None:
    print("Async: Starting consumer")
    for _ in range(5):
        val = await async_q.get()
        print(f"Async: Got {val}")
        async_q.task_done()
    print("Async: Consumer finished")

async def main() -> None:
    queue: janus.Queue[int] = janus.Queue()
    loop = asyncio.get_running_loop()

    # Run the synchronous producer in a separate thread
    producer_thread = threading.Thread(target=threaded_producer, args=(queue.sync_q,))
    producer_thread.start()

    # Run the asynchronous consumer
    await async_consumer(queue.async_q)

    producer_thread.join() # Ensure thread completes before closing queue
    await queue.aclose() # Crucial for proper shutdown of janus resources
    print("Main: Queue closed")

if __name__ == '__main__':
    asyncio.run(main())

view raw JSON →