Dagster PagerDuty
Dagster PagerDuty is a library that provides an integration between Dagster, a data orchestration platform, and PagerDuty, a SaaS incident response platform. It enables users to programmatically create alerts from their Dagster code by wrapping PagerDuty's Events API V2. Currently at version 0.29.0, this library is part of the larger Dagster monorepo, with its versioning for libraries like dagster-pagerduty aligning with the main dagster core releases (e.g., core 1.13.0 corresponds to libraries 0.29.0).
Common errors
-
ModuleNotFoundError: No module named 'dagster_pagerduty'
cause The `dagster-pagerduty` library has not been installed in the current Python environment.fixRun `pip install dagster-pagerduty`. -
pagerduty.EventV2_create() failed: 401 Unauthorized / 400 Bad Request
cause This typically indicates an issue with the PagerDuty routing key or the structure of the event payload. A '401 Unauthorized' points to an invalid or missing routing key. A '400 Bad Request' suggests malformed data in the `EventV2_create` call.fixDouble-check that the `routing_key` provided to `PagerDutyService` is correct and active for an Events API V2 integration in PagerDuty. Review the `summary`, `source`, `severity`, and `event_action` parameters to ensure they conform to PagerDuty's Events API V2 specification. -
dagster._core.errors.DagsterInvalidConfigError: Received an invalid value for config 'resources': Expected 'pagerduty' to be a 'ResourceDefinition'.
cause This error occurs when an incorrect type or old-style definition is used for the PagerDuty resource, particularly with newer Dagster versions expecting Pythonic resources.fixEnsure you are using `PagerDutyService` (a class-based resource) and correctly instantiating it, e.g., `resources={'pagerduty': PagerDutyService(routing_key='your_key')}`. Avoid using `@resource` decorated functions if you are on a modern Dagster version.
Warnings
- deprecated The `pagerduty_resource` function is considered legacy. Users should migrate to `PagerDutyService` class-based resource definitions for improved type hinting and Pydantic-backed configuration, especially with Dagster 1.3+ and newer.
- gotcha PagerDuty alerts require a valid Events API V2 integration key (routing key). This key must be configured correctly in PagerDuty for a specific service, and then provided to the `PagerDutyService` resource. An incorrect or missing key will result in API errors.
- gotcha Alerts triggered via `dagster-pagerduty` rely on the PagerDuty API. If PagerDuty itself is not configured to send notifications (e.g., no one is on call for the affected service, or suppression rules are active), the alert may be triggered in PagerDuty but not result in an actual notification.
Install
-
pip install dagster-pagerduty
Imports
- PagerDutyService
from dagster_pagerduty import pagerduty_resource
from dagster_pagerduty import PagerDutyService
- dagster
import dagster as dg
Quickstart
import dagster as dg
from dagster_pagerduty import PagerDutyService
import os
@dg.asset
def critical_data_check(pagerduty: PagerDutyService):
# Simulate a check that might fail
if os.environ.get('SIMULATE_FAILURE') == 'true':
summary_message = "Critical data check failed!"
pagerduty.EventV2_create(
summary=summary_message,
source="data_pipeline_monitor",
severity="critical",
event_action="trigger",
details={
"run_url": "https://dagster.example.com/runs/abc123",
"error_message": "Data quality threshold breached."
}
)
raise Exception(summary_message)
else:
print("Critical data check passed.")
defs = dg.Definitions(
assets=[critical_data_check],
resources={
"pagerduty": PagerDutyService(routing_key=os.environ.get('PAGERDUTY_ROUTING_KEY', ''))
},
)
# To run locally (ensure PAGERDUTY_ROUTING_KEY is set in your environment):
# from dagster import materialize_to_memory
# import os
# os.environ['PAGERDUTY_ROUTING_KEY'] = 'your_pagerduty_integration_key'
# # Set SIMULATE_FAILURE to 'true' to trigger an alert
# # os.environ['SIMULATE_FAILURE'] = 'true'
# result = materialize_to_memory(defs.get_assets_def_for_asset_keys(["critical_data_check"]), resources=defs.resources)
# assert result.success # Will fail if SIMULATE_FAILURE is 'true' and exception is raised