Opsgenie Provider for Apache Airflow
The `apache-airflow-providers-opsgenie` package integrates Apache Airflow with Opsgenie, a modern incident management platform. It allows Airflow DAGs to create, close, and manage alerts and incidents within Opsgenie, enhancing observability and response capabilities for workflows. The current version is 5.10.3, and it follows the frequent release cadence typical of Apache Airflow providers.
Warnings
- breaking The current version of `apache-airflow-providers-opsgenie` (5.x.x) requires Apache Airflow version 2.11.0 or higher. Installing on older Airflow versions (<2.11.0) will automatically upgrade Airflow, potentially causing database migration issues if not handled manually.
- breaking The `OpsgenieAlertOperator` was removed in provider version 5.0.0. It has been replaced by `OpsgenieCreateAlertOperator` for creating alerts and `OpsgenieDeleteAlertOperator` for deleting alerts. Additionally, the `hooks.opsgenie_alert` import path was removed.
- breaking In provider version 3.0.0, significant changes were introduced to `OpsgenieAlertHook`. The constructor no longer accepts additional arguments or keyword arguments, `get_conn` now returns an `opsgenie_sdk.AlertApi` object instead of a `requests.Session`, and the `execute` method was replaced by `create_alert`. Payload keys for fields like `visible_to` and `request_id` were also standardized to use snake_case instead of camelCase.
- gotcha To authenticate with Opsgenie, you must create an Airflow connection. The Opsgenie API Key should be stored in the 'Password' field of an 'Opsgenie' type connection in the Airflow UI (Admin -> Connections). The default connection ID is `opsgenie_default`.
- gotcha For new Airflow installations, especially when `pip` version 20.3 was released, resolver issues could occur. While less common with modern `pip` versions, if you encounter installation errors, downgrading pip or using `--use-deprecated=legacy-resolver` might be necessary.
Install
-
pip install apache-airflow-providers-opsgenie
Imports
- OpsgenieCreateAlertOperator
from airflow.providers.opsgenie.operators.opsgenie import OpsgenieCreateAlertOperator
- OpsgenieCloseAlertOperator
from airflow.providers.opsgenie.operators.opsgenie import OpsgenieCloseAlertOperator
- OpsgenieNotifier
from airflow.providers.opsgenie.notifications.opsgenie import OpsgenieNotifier
- OpsgenieAlertHook
from airflow.providers.opsgenie.hooks.opsgenie import OpsgenieAlertHook
Quickstart
import os
from datetime import datetime
from airflow.models.dag import DAG
from airflow.providers.opsgenie.operators.opsgenie import (
OpsgenieCreateAlertOperator,
OpsgenieCloseAlertOperator,
)
# Ensure 'opsgenie_default' connection is configured in Airflow UI:
# Admin -> Connections
# Conn Id: opsgenie_default
# Conn Type: Opsgenie
# Host: (Optional, defaults to https://api.opsgenie.com)
# Password: Your Opsgenie API Key (e.g., os.environ.get('OPSGENIE_API_KEY', ''))
with DAG(
dag_id="opsgenie_alert_example_dag",
schedule_interval=None,
start_date=datetime(2023, 1, 1),
catchup=False,
tags=["opsgenie", "alerting"],
) as dag:
create_alert = OpsgenieCreateAlertOperator(
task_id="create_opsgenie_alert",
message="High priority alert from Airflow DAG!",
recipients=["team_id:your_team_id"], # Or 'email:your_email@example.com'
priority="P1",
teams=[{"name": "your_team_name"}],
description="Automated alert for critical workflow failure.",
tags=["airflow", "critical", "pipeline"],
opsgenie_conn_id="opsgenie_default",
)
# In a real scenario, this would typically be triggered by another task's success or failure
close_alert = OpsgenieCloseAlertOperator(
task_id="close_opsgenie_alert",
identifier="{{ task_instance.xcom_pull(task_ids='create_opsgenie_alert')['alert_id'] }}",
identifier_type="id",
user="Airflow Automated System",
note="Alert successfully resolved by downstream tasks.",
opsgenie_conn_id="opsgenie_default",
)
create_alert >> close_alert