OpenLineage Airflow Provider
The OpenLineage Airflow Provider integrates Apache Airflow with OpenLineage, an open framework for data lineage collection and analysis. It automatically extracts metadata from Airflow DAGs and tasks, implementing Airflow listener hooks to send lineage events to an OpenLineage backend. The provider is currently at version 2.13.0 and follows the Apache Airflow providers support policy, with release cycles tied to Airflow's own release schedule, typically bumping minimum Airflow version requirements approximately every 12 months.
Warnings
- breaking Provider version 2.0.0 introduced significant breaking changes. All previously deprecated classes, parameters, and features were removed. Notably, the `normalize_sql` function was removed from the `openlineage.utils` module. This version also increased the minimum supported Apache Airflow version to 2.9.0.
- gotcha When developing custom OpenLineage extractors, be aware of potential cyclical import issues if importing from Airflow modules. OpenLineage code is instantiated during Airflow worker startup, which differs from DAG code loading, leading to subtle circular import problems.
- gotcha Incorrectly specifying the path to custom extractors via the `extractors` option in `airflow.cfg` or the `AIRFLOW__OPENLINEAGE__EXTRACTORS` environment variable will prevent the extractor from loading. This results in OpenLineage events missing operator-specific lineage for affected tasks.
- breaking The OpenLineage integration for Airflow underwent a significant migration with Airflow 2.7+. For Airflow versions <2.7, the integration was an external package (`openlineage-airflow`). For Airflow 2.7 and newer, it is the official `apache-airflow-providers-openlineage` provider. The legacy `openlineage-airflow` is no longer actively maintained.
- gotcha If an Airflow operator does not have a corresponding OpenLineage extractor, or if an extractor cannot determine input/output datasets (e.g., from `inlets` and `outlets` that are not Airflow Assets or lack specific lineage methods), the OpenLineage events for that task may be empty regarding inputs/outputs and operator-specific facets. General Airflow facets will still be emitted.
- gotcha There are known issues with the OpenLineage provider when running with Airflow in standalone mode, particularly concerning the scheduler shutting down due to `OpenLineageListener` pickling failures. This issue has been observed with provider versions 1.8.0 and above. This is typically not reproducible in distributed Airflow environments (e.g., Breeze, Google Composer, Astro Cloud).
Install
-
pip install apache-airflow-providers-openlineage
Imports
- BaseExtractor
from airflow.providers.openlineage.extractors.base import BaseExtractor
Quickstart
# 1. Install the provider (see 'install' section).
# 2. Configure the OpenLineage transport via environment variable or airflow.cfg.
# This example sends events to a local Marquez instance (http://localhost:5000).
import os
# Recommended method: Environment variable
os.environ['AIRFLOW__OPENLINEAGE__TRANSPORT'] = '{"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}'
# Alternatively, add this to your airflow.cfg under the [openlineage] section:
# [openlineage]
# transport = {"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}
# No changes to user DAG files are typically required for basic lineage collection.
# The provider automatically hooks into Airflow to extract metadata.
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id='openlineage_example_dag',
start_date=days_ago(1),
schedule_interval=None,
catchup=False,
tags=['openlineage', 'example'],
) as dag:
start_task = BashOperator(
task_id='start_task',
bash_command='echo "Starting lineage test..."',
)
process_data = BashOperator(
task_id='process_data',
bash_command='echo "Processing some data..." && sleep 5',
)
end_task = BashOperator(
task_id='end_task',
bash_command='echo "Lineage test complete!"',
)
start_task >> process_data >> end_task
print("OpenLineage Airflow Provider configured. Run an Airflow DAG to see lineage events.")