NATS Client for Python
nats-py is an asyncio-based Python client for the NATS messaging system, providing robust support for core NATS features like publish/subscribe, request/reply, and queue groups, as well as advanced functionalities through JetStream, Key-Value, and Microservices. It is actively maintained by NATS.io, with frequent patch and minor releases, ensuring compatibility with the latest NATS server features. The current version is 2.14.0.
Warnings
- breaking In `v2.11.0`, Key-Value (KV) keys validation became enabled by default. Previously invalid keys might now cause errors unless explicitly opted out.
- breaking The library was renamed from `asyncio-nats-client` to `nats-py` in `v2.0.0`. Key API changes include `subscribe()` no longer returning a Subscription ID directly (it returns a `Subscription` object), and error class names were updated to follow PEP-8 conventions (e.g., `ErrSlowConsumer` became `SlowConsumerError`).
- gotcha `nats-py` is an `asyncio`-native client. All core operations are asynchronous and must be `await`ed within `async def` functions. Attempting to call these directly from synchronous code will result in runtime errors.
- gotcha Proper connection lifecycle management is crucial. Connections should be explicitly closed or drained, especially when using a `nats.connect()` without `async with`. Use `await nc.close()` or `await nc.drain()` to ensure all pending messages are sent and resources are released gracefully.
- gotcha The NATS server clustering protocol is incompatible between NATS server v1 and v2. While `nats-py` is compatible with both server versions, a rolling upgrade of a NATS server cluster from v1 to v2 (or vice-versa) can lead to split-brain scenarios and client connection issues during the transition period.
- gotcha In `v2.13.0`, the `token` parameter in `nats.connect()` was enhanced to accept a callable, enabling dynamic token refresh on reconnect. If you were previously relying on static token behavior for reconnections, be aware of this new capability.
Install
-
pip install nats-py -
pip install nats-py[nkeys]
Imports
- nats.connect
import nats nc = await nats.connect("nats://localhost:4222")
Quickstart
import asyncio
import nats
import os
async def main():
nats_url = os.environ.get("NATS_URL", "nats://localhost:4222")
try:
# Connect to NATS. Use async with for graceful connection management.
async with await nats.connect(nats_url) as nc:
print(f"Connected to NATS at {nats_url}")
# Simple asynchronous message handler
async def message_handler(msg):
subject = msg.subject
data = msg.data.decode()
print(f"Received on '{subject}': {data}")
# Subscribe to a subject
sub = await nc.subscribe("foo", cb=message_handler)
print("Subscribed to 'foo'")
# Publish a message
await nc.publish("foo", b'Hello NATS from Python!')
print("Published 'Hello NATS from Python!' to 'foo'")
# Wait for a short period to allow message processing
await asyncio.sleep(1)
# Unsubscribe and drain connection
await sub.unsubscribe()
await nc.drain()
print("Drained connection and unsubscribed.")
except nats.errors.NoServersError:
print(f"Could not connect to NATS at {nats_url}. Is the server running?")
except Exception as e:
print(f"An error occurred: {e}")
if __name__ == '__main__':
# Run the main asynchronous function
asyncio.run(main())