OpenLineage Airflow Provider

2.13.0 · active · verified Fri Apr 10

The OpenLineage Airflow Provider integrates Apache Airflow with OpenLineage, an open framework for data lineage collection and analysis. It automatically extracts metadata from Airflow DAGs and tasks, implementing Airflow listener hooks to send lineage events to an OpenLineage backend. The provider is currently at version 2.13.0 and follows the Apache Airflow providers support policy, with release cycles tied to Airflow's own release schedule, typically bumping minimum Airflow version requirements approximately every 12 months.

Warnings

Install

Imports

Quickstart

After installing the provider, the core setup involves configuring the OpenLineage transport to specify where lineage events should be sent. This is typically done by setting the `AIRFLOW__OPENLINEAGE__TRANSPORT` environment variable or by adding a `transport` entry in the `[openlineage]` section of your `airflow.cfg`. No modifications to existing DAGs are generally necessary, as the provider operates via Airflow's listener mechanism.

# 1. Install the provider (see 'install' section).
# 2. Configure the OpenLineage transport via environment variable or airflow.cfg.
#    This example sends events to a local Marquez instance (http://localhost:5000).
import os

# Recommended method: Environment variable
os.environ['AIRFLOW__OPENLINEAGE__TRANSPORT'] = '{"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}'

# Alternatively, add this to your airflow.cfg under the [openlineage] section:
# [openlineage]
# transport = {"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}

# No changes to user DAG files are typically required for basic lineage collection.
# The provider automatically hooks into Airflow to extract metadata.

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

with DAG(
    dag_id='openlineage_example_dag',
    start_date=days_ago(1),
    schedule_interval=None,
    catchup=False,
    tags=['openlineage', 'example'],
) as dag:
    start_task = BashOperator(
        task_id='start_task',
        bash_command='echo "Starting lineage test..."',
    )

    process_data = BashOperator(
        task_id='process_data',
        bash_command='echo "Processing some data..." && sleep 5',
    )

    end_task = BashOperator(
        task_id='end_task',
        bash_command='echo "Lineage test complete!"',
    )

    start_task >> process_data >> end_task

print("OpenLineage Airflow Provider configured. Run an Airflow DAG to see lineage events.")

view raw JSON →