Dagster Prometheus Integration

0.29.0 · active · verified Fri Apr 17

Dagster-prometheus is a library that provides an integration for Dagster with Prometheus, allowing users to expose Dagster run and asset metrics, as well as custom metrics from their ops and assets. It is part of the Dagster ecosystem and typically releases new versions in lockstep with Dagster core releases. The current version is 0.29.0, corresponding to Dagster core 1.13.0.

Common errors

Warnings

Install

Imports

Quickstart

This example demonstrates how to define a Prometheus resource using `make_prometheus_resource`, attach it to a job, and then use the resource's exposed Prometheus registry within an op to emit custom metrics. The resource automatically starts an HTTP server for Prometheus to scrape metrics.

import os
from dagster import Definitions, job, op
from dagster_prometheus import make_prometheus_resource

# The Prometheus resource starts an HTTP server and exposes the
# prometheus_client registry for custom metrics.
prometheus_resource = make_prometheus_resource(
    port=int(os.environ.get("PROMETHEUS_PORT", "8000")),
    prefix=os.environ.get("PROMETHEUS_PREFIX", "dagster_"),
)

@op
def emit_custom_metrics_op(context):
    """
    An op that emits custom Prometheus metrics using the provided registry.
    The HTTP server will be running as long as the resource is active (e.g., during a job run).
    """
    registry = context.resources.prometheus_resource.registry
    
    # Create and increment a custom counter
    my_counter = registry.get_or_create_metric(
        "my_custom_run_counter",
        "A counter for runs of my_metrics_job",
        "counter"
    )
    my_counter.inc()
    
    # Create and set a custom gauge with labels
    my_gauge = registry.get_or_create_metric(
        "my_custom_value_gauge",
        "A gauge for a custom value",
        "gauge",
        labelnames=["job_name", "op_name"]
    )
    # Example value derived from run_id, ensures a different value each time for demo
    example_value = float(int(context.run_id.split('-')[0], 16) % 100)
    my_gauge.labels(job_name=context.job_name, op_name=context.op.name).set(example_value)

    context.log.info(f"Emitted custom metrics to port {context.resources.prometheus_resource.port}")


@job(resource_defs={"prometheus_resource": prometheus_resource})
def my_metrics_job():
    emit_custom_metrics_op()

defs = Definitions(jobs=[my_metrics_job])

# To run this example locally:
# 1. Save the code as a Python file (e.g., `metrics_repo.py`).
# 2. Run `dagster dev -f metrics_repo.py` in your terminal.
# 3. In the Dagster UI (typically localhost:3000), navigate to the Launchpad for `my_metrics_job` and click 'Launch'.
# 4. While the job is running (or if `dagster dev` is active), open your browser to `http://localhost:8000/metrics`
#    (or the port specified in PROMETHEUS_PORT if set) to view the exposed metrics.
#    You will see Dagster's default metrics along with 'dagster_my_custom_run_counter_total' and 'dagster_my_custom_value_gauge'.

view raw JSON →