OpenLineage Airflow Integration

1.45.0 · active · verified Thu Apr 09

The `openlineage-airflow` library provides an integration for Apache Airflow to emit lineage metadata to an OpenLineage backend. It captures information about DAGs, tasks, and data interactions, contributing to a comprehensive data lineage graph. The latest version is 1.45.0, and new versions are released frequently, often bi-weekly, aligning with the broader OpenLineage project.

Warnings

Install

Imports

Quickstart

This quickstart defines a basic Airflow DAG. The `openlineage-airflow` plugin, once installed and configured with `OPENLINEAGE_URL` and optionally `OPENLINEAGE_API_KEY` in the Airflow environment, will automatically capture and emit lineage events for this DAG's runs and tasks. No explicit OpenLineage imports are needed within the DAG file for basic functionality.

from __future__ import annotations

import pendulum
import os

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

# Ensure OPENLINEAGE_URL is set in your Airflow environment for events to be sent.
# For example: export OPENLINEAGE_URL="http://localhost:5000"
# If your OpenLineage backend requires authentication, also set OPENLINEAGE_API_KEY.

with DAG(
    dag_id="openlineage_example_dag",
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    schedule=None,
    catchup=False,
    tags=["openlineage", "example"],
) as dag:
    start_task = BashOperator(
        task_id="start_task",
        bash_command="echo 'Starting OpenLineage example DAG'",
    )

    process_data = BashOperator(
        task_id="process_data_task",
        bash_command="""
            echo "Simulating data processing..."
            # In a real scenario, this would interact with data sources (e.g., SQL, Spark).
            # The OpenLineage Airflow plugin automatically captures dataset information
            # from supported operators and frameworks.
            # Example for a SQL task:
            # airflow tasks run <dag_id> process_data_task 2023-01-01
            # For a more realistic example with SQL:
            # from airflow.providers.postgres.operators.postgres import PostgresOperator
            # PostgresOperator(task_id='insert_data', sql='INSERT INTO output_table SELECT * FROM input_table;')
            sleep 5
            echo "Data processed!"
        """,
    )

    end_task = BashOperator(
        task_id="end_task",
        bash_command="echo 'OpenLineage example DAG finished'",
    )

    start_task >> process_data >> end_task

view raw JSON →