PagerDuty Airflow Provider

5.2.4 · active · verified Sat Apr 11

The `apache-airflow-providers-pagerduty` package provides Apache Airflow integrations for PagerDuty, enabling users to send incident notifications and interact with the PagerDuty API directly from Airflow DAGs. It supports both the PagerDuty REST API and Events API. The current version is 5.2.4 and it follows the Apache Airflow provider release cadence, with frequent updates addressing features, bug fixes, and compatibility.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to use `send_pagerduty_notification` as an `on_failure_callback` for both a DAG and a specific task. Ensure you have configured a PagerDuty connection in Airflow (Admin -> Connections, type 'Pagerduty' or 'Pagerduty Events') or provide the `integration_key` directly. For security, it's recommended to store sensitive keys in Airflow Connections or a secret backend, and retrieve them via `BaseHook.get_connection`.

import os
from datetime import datetime

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.pagerduty.notifications.pagerduty import send_pagerduty_notification

PAGERDUTY_INTEGRATION_KEY = os.environ.get('PAGERDUTY_INTEGRATION_KEY', '')

with DAG(
    dag_id='pagerduty_notification_example',
    start_date=datetime(2023, 1, 1),
    schedule=None,
    catchup=False,
    tags=['pagerduty', 'notification'],
    on_failure_callback=[
        send_pagerduty_notification(
            summary="The DAG {{ dag.dag_id }} failed!",
            severity="critical",
            source="airflow dag_id: {{dag.dag_id}}",
            dedup_key="{{dag.dag_id}}-failure",
            component="airflow",
            integration_key=PAGERDUTY_INTEGRATION_KEY,
        )
    ]
) as dag:
    start_task = BashOperator(
        task_id='start_task',
        bash_command='echo "Starting DAG..."',
    )

    failing_task = BashOperator(
        task_id='failing_task',
        bash_command='exit 1', # This task will intentionally fail
        on_failure_callback=[
            send_pagerduty_notification(
                summary="Task {{ ti.task_id }} failed in DAG {{ dag.dag_id }}",
                severity="error",
                source="airflow task_id: {{ti.task_id}}",
                dedup_key="{{dag.dag_id}}-{{ti.task_id}}-failure",
                component="airflow",
                integration_key=PAGERDUTY_INTEGRATION_KEY,
            )
        ]
    )

    success_task = BashOperator(
        task_id='success_task',
        bash_command='echo "DAG succeeded!"',
    )

    start_task >> failing_task >> success_task

view raw JSON →