OpenLineage Airflow Integration
The `openlineage-airflow` library provides an integration for Apache Airflow to emit lineage metadata to an OpenLineage backend. It captures information about DAGs, tasks, and data interactions, contributing to a comprehensive data lineage graph. The latest version is 1.45.0, and new versions are released frequently, often bi-weekly, aligning with the broader OpenLineage project.
Warnings
- breaking Version 1.40.0 temporarily removed `__version__` attributes from top-level modules, which was fixed in 1.40.1. If your codebase relies on programmatic access to the library's version string (e.g., `openlineage_airflow.__version__`), it would have failed in 1.40.0.
- gotcha The OpenLineage Airflow integration requires Apache Airflow 2.0.0 or newer. Using it with older Airflow versions will lead to compatibility issues or outright failures.
- gotcha For OpenLineage events to be successfully sent, you must configure the OpenLineage backend URL and, if applicable, an API key. This is typically done via environment variables (`OPENLINEAGE_URL`, `OPENLINEAGE_API_KEY`) or within `airflow.cfg`. Misconfiguration will result in events not reaching your OpenLineage collector.
- gotcha The `openlineage-airflow` integration functions as an Airflow plugin and is automatically loaded by Airflow on startup. You generally do not need to add any specific imports or decorators to your DAG files for basic lineage collection to work. Users expecting explicit Python code to activate the integration might overlook this implicit behavior.
Install
-
pip install openlineage-airflow
Imports
- OpenLineageClient
from openlineage.client.OpenLineageClient import OpenLineageClient
Quickstart
from __future__ import annotations
import pendulum
import os
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
# Ensure OPENLINEAGE_URL is set in your Airflow environment for events to be sent.
# For example: export OPENLINEAGE_URL="http://localhost:5000"
# If your OpenLineage backend requires authentication, also set OPENLINEAGE_API_KEY.
with DAG(
dag_id="openlineage_example_dag",
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
schedule=None,
catchup=False,
tags=["openlineage", "example"],
) as dag:
start_task = BashOperator(
task_id="start_task",
bash_command="echo 'Starting OpenLineage example DAG'",
)
process_data = BashOperator(
task_id="process_data_task",
bash_command="""
echo "Simulating data processing..."
# In a real scenario, this would interact with data sources (e.g., SQL, Spark).
# The OpenLineage Airflow plugin automatically captures dataset information
# from supported operators and frameworks.
# Example for a SQL task:
# airflow tasks run <dag_id> process_data_task 2023-01-01
# For a more realistic example with SQL:
# from airflow.providers.postgres.operators.postgres import PostgresOperator
# PostgresOperator(task_id='insert_data', sql='INSERT INTO output_table SELECT * FROM input_table;')
sleep 5
echo "Data processed!"
""",
)
end_task = BashOperator(
task_id="end_task",
bash_command="echo 'OpenLineage example DAG finished'",
)
start_task >> process_data >> end_task