Oslo Messaging
Oslo Messaging is a foundational library providing an API for inter-process communication (IPC) for OpenStack services. It supports both Remote Procedure Call (RPC) and notification patterns over various message queue backends like RabbitMQ, Kafka, and ZeroMQ. It's currently at version 17.3.0 and follows the OpenStack release cadence, typically receiving updates roughly every six months.
Common errors
-
oslo_messaging.exceptions.No BackendFound: No backend found for transport URL: rabbit://guest:guest@localhost:5672/
cause The required message queue backend library (e.g., `amqp` for RabbitMQ) is not installed.fixInstall the specific backend using pip extras: `pip install oslo-messaging[rabbit]` or `pip install oslo-messaging[kafka]`. -
AttributeError: 'RPCClient' object has no attribute 'notify'
cause Attempting to use notification methods (like `notify`, `info`) on an `RPCClient` instance. These methods belong to the `Notifier` class.fixUse the `Notifier` class for sending notifications: `notifier = Notifier(transport, topic='mytopic')`, then `notifier.info(context, 'event.type', payload)`. -
ImportError: cannot import name 'get_notifier' from 'oslo_messaging'
cause The `get_notifier` function was removed or never existed; `Notifier` is instantiated directly.fixInstantiate `Notifier` directly: `notifier = Notifier(transport, topic='mytopic')`. -
amqp.exceptions.ConnectionForced: Connection.forbid-method: no auth mechanism found
cause The credentials provided in the `transport_url` are incorrect or the RabbitMQ user does not exist/has incorrect permissions.fixVerify the username and password in your `transport_url` (e.g., `rabbit://user:pass@host:port/`) or ensure the RabbitMQ user has appropriate access.
Warnings
- gotcha oslo-messaging requires a specific message queue backend to be installed separately (e.g., `amqp` for RabbitMQ, `kafka-python` for Kafka). Installing just `oslo-messaging` will lead to 'No backend found' errors.
- gotcha Configuration of transport URLs and other settings is primarily handled by `oslo_config`. While direct keyword arguments can be passed to `get_transport` for simple cases, production-grade applications should integrate `oslo_config` for robust and flexible configuration management.
- breaking Prior to version 7.0.0, `oslo.messaging` supported older `qpid` client libraries. This support was removed, and updates required migrating to `amqp>=2.0` for RabbitMQ or switching backends.
- gotcha The OpenStack ecosystem relies on a 'context' object (e.g., a dictionary containing user_id, tenant_id, etc.) being passed with every RPC call or notification. Forgetting to pass this context can lead to missing audit information or authorization failures in downstream services.
Install
-
pip install oslo-messaging -
pip install oslo-messaging[rabbit] # for RabbitMQ pip install oslo-messaging[kafka] # for Kafka pip install oslo-messaging[zeromq] # for ZeroMQ
Imports
- get_transport
from oslo_messaging import get_transport
- Target
from oslo_messaging import Target
- RPCClient
from oslo_messaging import RPCClient
- get_rpc_server
from oslo_messaging import get_rpc_server
- Notifier
from oslo_messaging import Notifier
Quickstart
import os
from oslo_config import cfg
from oslo_messaging import get_transport, RPCClient, Target, get_rpc_server, Notifier
# NOTE: This example requires a running message queue (e.g., RabbitMQ)
# and appropriate backend packages (e.g., `pip install oslo-messaging[rabbit]`)
# 1. Configure the transport (using oslo_config for simplicity)
# In a real application, this would load from a config file.
CONF = cfg.CONF
CONF.set_default('transport_url', os.environ.get('OSLO_MESSAGING_TRANSPORT_URL', 'rabbit://guest:guest@localhost:5672/'))
try:
transport = get_transport(CONF)
except Exception as e:
print(f"Error getting transport: {e}. Make sure your message queue is running and transport_url is correct.")
exit(1)
# --- RPC Example ---
# RPC Server Handler
class MyHandler(object):
def say_hello(self, ctxt, name):
print(f"[Server] Received 'say_hello' for: {name} (context: {ctxt})")
return f"Hello, {name}!"
def add_numbers(self, ctxt, a, b):
print(f"[Server] Received 'add_numbers' for: {a}, {b} (context: {ctxt})")
return a + b
# Define a target for the RPC server and client
rpc_target = Target(topic='my_service_topic', server='my_host')
# RPC Server setup (typically in a separate process/thread)
rpc_server = get_rpc_server(transport, rpc_target, [MyHandler()])
# In a real app, you'd start this in a loop or dedicated process
# rpc_server.start()
# rpc_server.wait()
print("[RPC Server] Handler registered for topic 'my_service_topic'. (Not started for this quickstart)")
# RPC Client setup
rpc_client = RPCClient(transport, rpc_target)
# Make an RPC call
try:
# Simulate a context object
context = {'user_id': 'test_user', 'tenant_id': 'test_tenant'}
result_hello = rpc_client.call(context, 'say_hello', name='World')
print(f"[RPC Client] Result of 'say_hello': {result_hello}")
result_add = rpc_client.call(context, 'add_numbers', a=5, b=3)
print(f"[RPC Client] Result of 'add_numbers': {result_add}")
# Cast (fire and forget)
rpc_client.cast(context, 'say_hello', name='Async User')
print("[RPC Client] Sent 'say_hello' cast for 'Async User'")
except Exception as e:
print(f"[RPC Client] Error making RPC call: {e}. Ensure a server is running and message queue is accessible.")
# --- Notification Example ---
notifier = Notifier(transport, topic='my_notifications_topic')
# Publish a notification
notification_payload = {'event_type': 'resource_created', 'resource_id': 'xyz-123'}
notifier.info(context, 'resource.create.start', notification_payload)
print(f"[Notifier] Sent 'info' notification: {notification_payload}")
# Clean up transport resources
transport.cleanup()