AMQP 1.0 Client Library for Python

1.6.11 · maintenance · verified Thu Apr 09

uAMQP is an AMQP 1.0 Client Library for Python, serving as a C-extension wrapper around the Azure uAMQP C library. It provides low-level AMQP protocol implementation. The current version is 1.6.11. As of v1.6.5, the library is in maintenance mode, with major Azure SDKs (Event Hubs, Service Bus) now utilizing a pure Python AMQP library for new development.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to establish an AMQP connection, authenticate using SASLPlain, and send a message to a target entity (e.g., an Azure Service Bus queue or Event Hub). It assumes authentication details are provided via environment variables.

import os
import time
import uamqp
from uamqp import authentication, Message, SenderLink, Connection

# Set these environment variables for a runnable example:
# AMQP_ADDRESS: e.g., "amqps://<namespace>.servicebus.windows.net/<entity_name>"
# SAS_POLICY: e.g., "RootManageSharedAccessKey"
# SAS_KEY: The shared access key for the policy
AMQP_ADDRESS = os.environ.get("AMQP_ADDRESS", "amqps://localhost:5672/my_queue")
SAS_POLICY = os.environ.get("SAS_POLICY", "your_sas_policy_name")
SAS_KEY = os.environ.get("SAS_KEY", "your_sas_key")

conn = None
sender = None

try:
    if not all([AMQP_ADDRESS, SAS_POLICY, SAS_KEY]):
        print("Please set AMQP_ADDRESS, SAS_POLICY, and SAS_KEY environment variables.")
        exit(1)

    # Extract hostname (e.g., <namespace>.servicebus.windows.net) from AMQP_ADDRESS
    hostname = AMQP_ADDRESS.split('//', 1)[-1].split('/', 1)[0].split(':', 1)[0]
    # Extract target address (e.g., <entity_name>) from AMQP_ADDRESS
    target_address = AMQP_ADDRESS.split('/', 3)[-1]

    # Create SASL authentication
    sasl_auth = authentication.SASLPlain(
        hostname=hostname,
        username=SAS_POLICY,
        password=SAS_KEY
    )

    # Create an AMQP connection
    print(f"Connecting to {hostname}...")
    conn = Connection(
        hostname,
        sasl_auth=sasl_auth,
        idle_timeout=10000,
        debug=False # Set to True for verbose AMQP tracing
    )

    # Create a SenderLink to send messages
    sender = SenderLink(conn, target_address, debug=False)
    sender.open()
    print(f"SenderLink opened to {target_address}")

    # Create and send a message
    message_body = f"Hello from uAMQP at {time.time()}!"
    message = Message(message_body.encode('utf-8'))
    sender.send_messages([message])
    print(f"Sent message: '{message_body}'")

    time.sleep(1) # Give time for message to be sent

except Exception as e:
    print(f"An error occurred: {e}")
    print("Ensure your environment variables are correctly configured and the AMQP endpoint is accessible.")
finally:
    if sender and sender.is_open:
        sender.close()
        print("SenderLink closed.")
    if conn and conn.is_open:
        conn.close()
        print("Connection closed.")

view raw JSON →