DataHub Airflow Plugin

1.5.0.6 · active · verified Sun Apr 12

The `acryl-datahub-airflow-plugin` library provides an integration for Apache Airflow to automatically capture metadata, lineage, and run information from DAGs and tasks and send it to DataHub. It supports automatic column-level lineage extraction from various SQL operators, Airflow DAG and task properties, ownership, tags, and task run statuses. The plugin requires Airflow 2.7+ and Python 3.10+ and is currently at version 1.5.0.6, with a frequent release cadence tied to the broader DataHub project.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates a basic Airflow DAG. Once the `acryl-datahub-airflow-plugin` is installed and a 'DataHub REST Server' connection named `datahub_rest_default` is configured in Airflow, the plugin automatically extracts metadata and lineage for supported operators (like SQL operators or those using native Airflow Datasets/Assets) without explicit Python imports in the DAG code. For custom operators, you might need to use `inlets` and `outlets` or `prepare_lineage`/`apply_lineage` decorators. Ensure your DataHub GMS host is accessible from Airflow.

import os
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

# Ensure DataHub REST connection is configured in Airflow UI or via CLI:
# airflow connections add --conn-type 'datahub-rest' 'datahub_rest_default' --conn-host 'http://datahub-gms:8080' --conn-password "$DATAHUB_AUTH_TOKEN"

with DAG(
    dag_id='datahub_example_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
    tags=['datahub', 'lineage', 'example'],
) as dag:
    start_task = BashOperator(
        task_id='start_task',
        bash_command='echo "Starting DAG"',
    )

    # Example of a task that would automatically get lineage if it's a supported SQL operator
    # (e.g., PostgresOperator, BigQueryInsertJobOperator, etc., not shown here for brevity).
    # The plugin automatically extracts lineage based on OpenLineage events.
    process_data_task = BashOperator(
        task_id='process_data_task',
        bash_command='echo "Processing data..." && sleep 5',
        # For manual lineage, you can use inlets/outlets attributes (table-level only)
        # inlets={'datasets': [{'platform': 'postgres', 'name': 'mydb.public.source_table'}]},
        # outlets={'datasets': [{'platform': 'postgres', 'name': 'mydb.public.target_table'}]},
    )

    end_task = BashOperator(
        task_id='end_task',
        bash_command='echo "DAG finished"',
    )

    start_task >> process_data_task >> end_task

view raw JSON →