Opsgenie Provider for Apache Airflow

5.10.3 · active · verified Mon Apr 13

The `apache-airflow-providers-opsgenie` package integrates Apache Airflow with Opsgenie, a modern incident management platform. It allows Airflow DAGs to create, close, and manage alerts and incidents within Opsgenie, enhancing observability and response capabilities for workflows. The current version is 5.10.3, and it follows the frequent release cadence typical of Apache Airflow providers.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to use the `OpsgenieCreateAlertOperator` to trigger an alert and `OpsgenieCloseAlertOperator` to resolve it within an Airflow DAG. It assumes an Opsgenie connection named `opsgenie_default` is configured in the Airflow UI with the Opsgenie API Key. The `alert_id` from the created alert is XCom-pulled to close the corresponding alert.

import os
from datetime import datetime
from airflow.models.dag import DAG
from airflow.providers.opsgenie.operators.opsgenie import (
    OpsgenieCreateAlertOperator,
    OpsgenieCloseAlertOperator,
)

# Ensure 'opsgenie_default' connection is configured in Airflow UI:
# Admin -> Connections
# Conn Id: opsgenie_default
# Conn Type: Opsgenie
# Host: (Optional, defaults to https://api.opsgenie.com)
# Password: Your Opsgenie API Key (e.g., os.environ.get('OPSGENIE_API_KEY', ''))

with DAG(
    dag_id="opsgenie_alert_example_dag",
    schedule_interval=None,
    start_date=datetime(2023, 1, 1),
    catchup=False,
    tags=["opsgenie", "alerting"],
) as dag:
    create_alert = OpsgenieCreateAlertOperator(
        task_id="create_opsgenie_alert",
        message="High priority alert from Airflow DAG!",
        recipients=["team_id:your_team_id"], # Or 'email:your_email@example.com'
        priority="P1",
        teams=[{"name": "your_team_name"}],
        description="Automated alert for critical workflow failure.",
        tags=["airflow", "critical", "pipeline"],
        opsgenie_conn_id="opsgenie_default",
    )

    # In a real scenario, this would typically be triggered by another task's success or failure
    close_alert = OpsgenieCloseAlertOperator(
        task_id="close_opsgenie_alert",
        identifier="{{ task_instance.xcom_pull(task_ids='create_opsgenie_alert')['alert_id'] }}",
        identifier_type="id",
        user="Airflow Automated System",
        note="Alert successfully resolved by downstream tasks.",
        opsgenie_conn_id="opsgenie_default",
    )

    create_alert >> close_alert

view raw JSON →