OpenTelemetry Celery Instrumentation

0.61b0 · active · verified Thu Apr 09

This library provides OpenTelemetry instrumentation for Celery, enabling comprehensive monitoring of Celery applications by automatically collecting telemetry data such as task execution times, worker performance, and error rates. It also captures distributed traces across your task pipeline. The current version is `0.61b0`, and packages within the `opentelemetry-python-contrib` repository typically follow a monthly release cadence for beta versions.

Warnings

Install

Imports

Quickstart

This example demonstrates how to set up OpenTelemetry tracing for a Celery application. It configures a `TracerProvider` with a `BatchSpanProcessor` and an `OTLPSpanExporter` (or `ConsoleSpanExporter` as a fallback). Crucially, the `CeleryInstrumentor` is initialized within the `worker_process_init` signal handler to ensure proper tracing context propagation across Celery's prefork worker model. Remember to set `OTEL_SERVICE_NAME` and `OTEL_EXPORTER_OTLP_ENDPOINT` environment variables for OTLP export.

import os
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from celery import Celery
from celery.signals import worker_process_init

# Configure OpenTelemetry Tracer Provider
# For local development, ConsoleSpanExporter is useful. For production, OTLPSpanExporter.
resource = Resource.create({"service.name": os.environ.get('OTEL_SERVICE_NAME', 'celery-app')})
provider = TracerProvider(resource=resource)

# Choose an exporter. For OTLP, ensure an OTLP endpoint is available (e.g., OpenTelemetry Collector).
# Exporter endpoint can be configured via environment variable OTEL_EXPORTER_OTLP_ENDPOINT
if os.environ.get('OTEL_EXPORTER_OTLP_ENDPOINT'):
    exporter = OTLPSpanExporter()
else:
    exporter = ConsoleSpanExporter()

processor = BatchSpanProcessor(exporter)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

# Initialize Celery app
app = Celery(
    'my_celery_app',
    broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
    backend=os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')
)

# Crucial: Initialize instrumentation AFTER Celery worker process is initialized
@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
    CeleryInstrumentor().instrument()
    print("OpenTelemetry Celery instrumentation initialized.")

# Define a simple task
@app.task
def add(x, y):
    with trace.get_current_tracer().start_as_current_span("add.task.execution") as span:
        result = x + y
        span.set_attribute("sum", result)
        print(f"Task 'add' executed: {x} + {y} = {result}")
        return result

if __name__ == '__main__':
    # Example of how to send a task (typically done from a separate producer process)
    print("Sending task...")
    task = add.delay(1, 2)
    print(f"Task ID: {task.id}")
    print(f"Task result: {task.get(timeout=10)}")

    # To run the worker (from your terminal):
    # OTEL_SERVICE_NAME="celery-worker" OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4317" celery -A your_module_name worker -l info

view raw JSON →