Oslo Messaging

17.3.0 · active · verified Thu Apr 16

Oslo Messaging is a foundational library providing an API for inter-process communication (IPC) for OpenStack services. It supports both Remote Procedure Call (RPC) and notification patterns over various message queue backends like RabbitMQ, Kafka, and ZeroMQ. It's currently at version 17.3.0 and follows the OpenStack release cadence, typically receiving updates roughly every six months.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates the core components of oslo-messaging: setting up a transport, defining an RPC target, creating an RPC client, and using a notifier. It showcases both RPC (Remote Procedure Call) with `call` and `cast` methods, and simple notifications. For actual execution, ensure a message queue (like RabbitMQ) is running and the appropriate `oslo-messaging` backend is installed (e.g., `pip install oslo-messaging[rabbit]`). The `OSLO_MESSAGING_TRANSPORT_URL` environment variable can be used to configure the message queue connection string.

import os
from oslo_config import cfg
from oslo_messaging import get_transport, RPCClient, Target, get_rpc_server, Notifier

# NOTE: This example requires a running message queue (e.g., RabbitMQ)
# and appropriate backend packages (e.g., `pip install oslo-messaging[rabbit]`)

# 1. Configure the transport (using oslo_config for simplicity)
#    In a real application, this would load from a config file.
CONF = cfg.CONF
CONF.set_default('transport_url', os.environ.get('OSLO_MESSAGING_TRANSPORT_URL', 'rabbit://guest:guest@localhost:5672/'))

try:
    transport = get_transport(CONF)
except Exception as e:
    print(f"Error getting transport: {e}. Make sure your message queue is running and transport_url is correct.")
    exit(1)

# --- RPC Example ---

# RPC Server Handler
class MyHandler(object):
    def say_hello(self, ctxt, name):
        print(f"[Server] Received 'say_hello' for: {name} (context: {ctxt})")
        return f"Hello, {name}!"

    def add_numbers(self, ctxt, a, b):
        print(f"[Server] Received 'add_numbers' for: {a}, {b} (context: {ctxt})")
        return a + b

# Define a target for the RPC server and client
rpc_target = Target(topic='my_service_topic', server='my_host')

# RPC Server setup (typically in a separate process/thread)
rpc_server = get_rpc_server(transport, rpc_target, [MyHandler()])
# In a real app, you'd start this in a loop or dedicated process
# rpc_server.start()
# rpc_server.wait()
print("[RPC Server] Handler registered for topic 'my_service_topic'. (Not started for this quickstart)")

# RPC Client setup
rpc_client = RPCClient(transport, rpc_target)

# Make an RPC call
try:
    # Simulate a context object
    context = {'user_id': 'test_user', 'tenant_id': 'test_tenant'}

    result_hello = rpc_client.call(context, 'say_hello', name='World')
    print(f"[RPC Client] Result of 'say_hello': {result_hello}")

    result_add = rpc_client.call(context, 'add_numbers', a=5, b=3)
    print(f"[RPC Client] Result of 'add_numbers': {result_add}")

    # Cast (fire and forget)
    rpc_client.cast(context, 'say_hello', name='Async User')
    print("[RPC Client] Sent 'say_hello' cast for 'Async User'")

except Exception as e:
    print(f"[RPC Client] Error making RPC call: {e}. Ensure a server is running and message queue is accessible.")

# --- Notification Example ---

notifier = Notifier(transport, topic='my_notifications_topic')

# Publish a notification
notification_payload = {'event_type': 'resource_created', 'resource_id': 'xyz-123'}
notifier.info(context, 'resource.create.start', notification_payload)
print(f"[Notifier] Sent 'info' notification: {notification_payload}")

# Clean up transport resources
transport.cleanup()

view raw JSON →