OpenTelemetry Celery Instrumentation
This library provides OpenTelemetry instrumentation for Celery, enabling comprehensive monitoring of Celery applications by automatically collecting telemetry data such as task execution times, worker performance, and error rates. It also captures distributed traces across your task pipeline. The current version is `0.61b0`, and packages within the `opentelemetry-python-contrib` repository typically follow a monthly release cadence for beta versions.
Warnings
- gotcha OpenTelemetry Python contrib packages, including Celery instrumentation, are currently in beta. This means APIs and behavior may change in future versions, and they are generally not recommended for production environments without careful consideration.
- breaking The `CeleryInstrumentor` must be initialized within the `celery.signals.worker_process_init` signal handler when using Celery's prefork worker model. Failing to do so can lead to incorrect or missing traces, as child processes may not inherit the parent's OpenTelemetry state properly.
- gotcha Telemetry data (traces, metrics) will not be collected or visible without a properly configured OpenTelemetry SDK and an exporter (e.g., OTLP, Jaeger, Console). Ensure `TracerProvider`, `SpanProcessor`, and an `Exporter` are set up.
- gotcha OpenTelemetry Python is adopting a semantic convention migration plan. While currently focused on HTTP-related instrumentations, this may eventually affect all types of instrumentations, including Celery, and could introduce breaking changes to attribute names or span structures.
- gotcha The `opentelemetry-instrumentation-celery` package only instruments Celery itself. Depending on your Celery broker (e.g., Redis, RabbitMQ) and result backend, you might need to install additional OpenTelemetry instrumentations (e.g., `opentelemetry-instrumentation-redis`, `opentelemetry-instrumentation-amqp`).
Install
-
pip install opentelemetry-instrumentation-celery opentelemetry-sdk
Imports
- CeleryInstrumentor
from opentelemetry.instrumentation.celery import CeleryInstrumentor
Quickstart
import os
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from celery import Celery
from celery.signals import worker_process_init
# Configure OpenTelemetry Tracer Provider
# For local development, ConsoleSpanExporter is useful. For production, OTLPSpanExporter.
resource = Resource.create({"service.name": os.environ.get('OTEL_SERVICE_NAME', 'celery-app')})
provider = TracerProvider(resource=resource)
# Choose an exporter. For OTLP, ensure an OTLP endpoint is available (e.g., OpenTelemetry Collector).
# Exporter endpoint can be configured via environment variable OTEL_EXPORTER_OTLP_ENDPOINT
if os.environ.get('OTEL_EXPORTER_OTLP_ENDPOINT'):
exporter = OTLPSpanExporter()
else:
exporter = ConsoleSpanExporter()
processor = BatchSpanProcessor(exporter)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
# Initialize Celery app
app = Celery(
'my_celery_app',
broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
backend=os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')
)
# Crucial: Initialize instrumentation AFTER Celery worker process is initialized
@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
CeleryInstrumentor().instrument()
print("OpenTelemetry Celery instrumentation initialized.")
# Define a simple task
@app.task
def add(x, y):
with trace.get_current_tracer().start_as_current_span("add.task.execution") as span:
result = x + y
span.set_attribute("sum", result)
print(f"Task 'add' executed: {x} + {y} = {result}")
return result
if __name__ == '__main__':
# Example of how to send a task (typically done from a separate producer process)
print("Sending task...")
task = add.delay(1, 2)
print(f"Task ID: {task.id}")
print(f"Task result: {task.get(timeout=10)}")
# To run the worker (from your terminal):
# OTEL_SERVICE_NAME="celery-worker" OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4317" celery -A your_module_name worker -l info