Dagster PagerDuty

0.29.0 · active · verified Thu Apr 16

Dagster PagerDuty is a library that provides an integration between Dagster, a data orchestration platform, and PagerDuty, a SaaS incident response platform. It enables users to programmatically create alerts from their Dagster code by wrapping PagerDuty's Events API V2. Currently at version 0.29.0, this library is part of the larger Dagster monorepo, with its versioning for libraries like dagster-pagerduty aligning with the main dagster core releases (e.g., core 1.13.0 corresponds to libraries 0.29.0).

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to define a `PagerDutyService` resource and use it within a Dagster asset to trigger a PagerDuty alert. The `routing_key` (PagerDuty Events API V2 integration key) should be securely provided, typically via an environment variable. When the `critical_data_check` asset encounters a simulated failure, it sends a 'critical' event to PagerDuty.

import dagster as dg
from dagster_pagerduty import PagerDutyService
import os

@dg.asset
def critical_data_check(pagerduty: PagerDutyService):
    # Simulate a check that might fail
    if os.environ.get('SIMULATE_FAILURE') == 'true':
        summary_message = "Critical data check failed!"
        pagerduty.EventV2_create(
            summary=summary_message,
            source="data_pipeline_monitor",
            severity="critical",
            event_action="trigger",
            details={
                "run_url": "https://dagster.example.com/runs/abc123",
                "error_message": "Data quality threshold breached."
            }
        )
        raise Exception(summary_message)
    else:
        print("Critical data check passed.")

defs = dg.Definitions(
    assets=[critical_data_check],
    resources={
        "pagerduty": PagerDutyService(routing_key=os.environ.get('PAGERDUTY_ROUTING_KEY', ''))
    },
)

# To run locally (ensure PAGERDUTY_ROUTING_KEY is set in your environment):
# from dagster import materialize_to_memory
# import os
# os.environ['PAGERDUTY_ROUTING_KEY'] = 'your_pagerduty_integration_key'
# # Set SIMULATE_FAILURE to 'true' to trigger an alert
# # os.environ['SIMULATE_FAILURE'] = 'true'
# result = materialize_to_memory(defs.get_assets_def_for_asset_keys(["critical_data_check"]), resources=defs.resources)
# assert result.success # Will fail if SIMULATE_FAILURE is 'true' and exception is raised

view raw JSON →