Dagster Prometheus Integration
Dagster-prometheus is a library that provides an integration for Dagster with Prometheus, allowing users to expose Dagster run and asset metrics, as well as custom metrics from their ops and assets. It is part of the Dagster ecosystem and typically releases new versions in lockstep with Dagster core releases. The current version is 0.29.0, corresponding to Dagster core 1.13.0.
Common errors
-
ModuleNotFoundError: No module named 'dagster_prometheus'
cause The `dagster-prometheus` library has not been installed in your Python environment.fixRun `pip install dagster-prometheus` to install the package. -
OSError: [Errno 98] Address already in use
cause The port specified for the Prometheus HTTP server (default 8000) is already being used by another process on your machine.fixProvide a different port to `make_prometheus_resource` (e.g., `make_prometheus_resource(port=8001)`), or ensure no other application is listening on the default port. -
dagster._core.errors.DagsterInvalidConfigError: Error in config for resource "prometheus_resource": Field "port" is missing and not optional.
cause The `make_prometheus_resource` function requires the `port` argument to be explicitly provided or configured, especially if you're using a config system that overrides defaults.fixEnsure that the `port` argument is provided when calling `make_prometheus_resource`, for example: `make_prometheus_resource(port=8000)`.
Warnings
- breaking Dagster's resource API underwent significant changes with the 1.0 release. If you are upgrading from Dagster core versions prior to 1.0, your existing resource definitions, including those for `dagster-prometheus`, will likely need to be updated to the new resource definition syntax.
- gotcha The Prometheus HTTP server started by `make_prometheus_resource` is bound to the lifespan of the Dagster process that initializes the resource (e.g., a specific job run process, or the `dagster dev` process). For continuous metric exposure in a production environment, ensure the Dagster process (e.g., `dagster-webserver`, `dagster-daemon`, or a persistent run process) is long-lived.
- gotcha `dagster-prometheus` is a library within the Dagster monorepo. While its own versioning (0.x.y) is separate from Dagster core (1.x.y), it is developed and released in tandem. Using mismatched versions of `dagster-prometheus` and `dagster` core can lead to unexpected behavior or `ImportError`s.
Install
-
pip install dagster-prometheus
Imports
- make_prometheus_resource
from dagster_prometheus import make_prometheus_resource
Quickstart
import os
from dagster import Definitions, job, op
from dagster_prometheus import make_prometheus_resource
# The Prometheus resource starts an HTTP server and exposes the
# prometheus_client registry for custom metrics.
prometheus_resource = make_prometheus_resource(
port=int(os.environ.get("PROMETHEUS_PORT", "8000")),
prefix=os.environ.get("PROMETHEUS_PREFIX", "dagster_"),
)
@op
def emit_custom_metrics_op(context):
"""
An op that emits custom Prometheus metrics using the provided registry.
The HTTP server will be running as long as the resource is active (e.g., during a job run).
"""
registry = context.resources.prometheus_resource.registry
# Create and increment a custom counter
my_counter = registry.get_or_create_metric(
"my_custom_run_counter",
"A counter for runs of my_metrics_job",
"counter"
)
my_counter.inc()
# Create and set a custom gauge with labels
my_gauge = registry.get_or_create_metric(
"my_custom_value_gauge",
"A gauge for a custom value",
"gauge",
labelnames=["job_name", "op_name"]
)
# Example value derived from run_id, ensures a different value each time for demo
example_value = float(int(context.run_id.split('-')[0], 16) % 100)
my_gauge.labels(job_name=context.job_name, op_name=context.op.name).set(example_value)
context.log.info(f"Emitted custom metrics to port {context.resources.prometheus_resource.port}")
@job(resource_defs={"prometheus_resource": prometheus_resource})
def my_metrics_job():
emit_custom_metrics_op()
defs = Definitions(jobs=[my_metrics_job])
# To run this example locally:
# 1. Save the code as a Python file (e.g., `metrics_repo.py`).
# 2. Run `dagster dev -f metrics_repo.py` in your terminal.
# 3. In the Dagster UI (typically localhost:3000), navigate to the Launchpad for `my_metrics_job` and click 'Launch'.
# 4. While the job is running (or if `dagster dev` is active), open your browser to `http://localhost:8000/metrics`
# (or the port specified in PROMETHEUS_PORT if set) to view the exposed metrics.
# You will see Dagster's default metrics along with 'dagster_my_custom_run_counter_total' and 'dagster_my_custom_value_gauge'.