DataHub Airflow Plugin
The `acryl-datahub-airflow-plugin` library provides an integration for Apache Airflow to automatically capture metadata, lineage, and run information from DAGs and tasks and send it to DataHub. It supports automatic column-level lineage extraction from various SQL operators, Airflow DAG and task properties, ownership, tags, and task run statuses. The plugin requires Airflow 2.7+ and Python 3.10+ and is currently at version 1.5.0.6, with a frequent release cadence tied to the broader DataHub project.
Warnings
- breaking Python 3.9 support has been dropped; all `acryl-datahub` modules, including the Airflow plugin, now require Python 3.10 or later. Upgrade Python before upgrading the plugin.
- breaking The `acryl-datahub-airflow-plugin` has dropped support for Airflow versions less than 2.7. Users on older Airflow versions must upgrade Airflow or pin to an older plugin version.
- breaking The v1 plugin (`DATAHUB_AIRFLOW_PLUGIN_USE_V1_PLUGIN=true`) has been removed. The v2 plugin is now the default. Users explicitly setting `DATAHUB_AIRFLOW_PLUGIN_USE_V1_PLUGIN=true` must upgrade or pin to an older plugin version.
- breaking The latest DataHub Airflow plugin is not compatible with Airflow 3.2+ due to a deprecated import `airflow.models.mappedoperator`, causing `ModuleNotFoundError`.
- gotcha Airflow 3.0.6 pins `pydantic==2.11.7`, which contains a bug preventing the DataHub plugin from importing correctly. This issue is resolved in Airflow 3.1.0+ (which uses `pydantic>=2.11.8`).
- gotcha The 'kill switch' to disable the plugin differs between Airflow versions. For Airflow 2.x, use an Airflow Variable `datahub_airflow_plugin_disable_listener` set to `true`. For Airflow 3.x, use the environment variable `AIRFLOW_VAR_DATAHUB_AIRFLOW_PLUGIN_DISABLE_LISTENER=true`.
- gotcha Errors like 'Unable to emit metadata to DataHub GMS' often stem from incorrect URL encoding in the Airflow DataHub connection string, specifically for the `/api/gms` path. Airflow may not correctly interpret unencoded slashes in connection hosts.
Install
-
pip install 'acryl-datahub-airflow-plugin[airflow2]' -
pip install 'apache-airflow-providers-openlineage>=1.0.0' pip install acryl-datahub-airflow-plugin -
pip install 'acryl-datahub-airflow-plugin[airflow3]' -
pip install 'acryl-datahub-airflow-plugin[airflow3,datahub-kafka]'
Imports
- prepare_lineage, apply_lineage
from datahub_airflow_plugin.operators.lineage import prepare_lineage, apply_lineage
Quickstart
import os
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
# Ensure DataHub REST connection is configured in Airflow UI or via CLI:
# airflow connections add --conn-type 'datahub-rest' 'datahub_rest_default' --conn-host 'http://datahub-gms:8080' --conn-password "$DATAHUB_AUTH_TOKEN"
with DAG(
dag_id='datahub_example_dag',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
tags=['datahub', 'lineage', 'example'],
) as dag:
start_task = BashOperator(
task_id='start_task',
bash_command='echo "Starting DAG"',
)
# Example of a task that would automatically get lineage if it's a supported SQL operator
# (e.g., PostgresOperator, BigQueryInsertJobOperator, etc., not shown here for brevity).
# The plugin automatically extracts lineage based on OpenLineage events.
process_data_task = BashOperator(
task_id='process_data_task',
bash_command='echo "Processing data..." && sleep 5',
# For manual lineage, you can use inlets/outlets attributes (table-level only)
# inlets={'datasets': [{'platform': 'postgres', 'name': 'mydb.public.source_table'}]},
# outlets={'datasets': [{'platform': 'postgres', 'name': 'mydb.public.target_table'}]},
)
end_task = BashOperator(
task_id='end_task',
bash_command='echo "DAG finished"',
)
start_task >> process_data_task >> end_task