NATS Client for Python

2.14.0 · active · verified Fri Apr 10

nats-py is an asyncio-based Python client for the NATS messaging system, providing robust support for core NATS features like publish/subscribe, request/reply, and queue groups, as well as advanced functionalities through JetStream, Key-Value, and Microservices. It is actively maintained by NATS.io, with frequent patch and minor releases, ensuring compatibility with the latest NATS server features. The current version is 2.14.0.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to connect to a NATS server, subscribe to a subject with an asynchronous callback, publish a message, and ensure graceful shutdown using `async with`. Replace `nats://localhost:4222` with your NATS server address or set the `NATS_URL` environment variable.

import asyncio
import nats
import os

async def main():
    nats_url = os.environ.get("NATS_URL", "nats://localhost:4222")
    try:
        # Connect to NATS. Use async with for graceful connection management.
        async with await nats.connect(nats_url) as nc:
            print(f"Connected to NATS at {nats_url}")

            # Simple asynchronous message handler
            async def message_handler(msg):
                subject = msg.subject
                data = msg.data.decode()
                print(f"Received on '{subject}': {data}")

            # Subscribe to a subject
            sub = await nc.subscribe("foo", cb=message_handler)
            print("Subscribed to 'foo'")

            # Publish a message
            await nc.publish("foo", b'Hello NATS from Python!')
            print("Published 'Hello NATS from Python!' to 'foo'")

            # Wait for a short period to allow message processing
            await asyncio.sleep(1) 

            # Unsubscribe and drain connection
            await sub.unsubscribe()
            await nc.drain()
            print("Drained connection and unsubscribed.")

    except nats.errors.NoServersError:
        print(f"Could not connect to NATS at {nats_url}. Is the server running?")
    except Exception as e:
        print(f"An error occurred: {e}")

if __name__ == '__main__':
    # Run the main asynchronous function
    asyncio.run(main())

view raw JSON →