OpenLineage Python Client
OpenLineage Python Client is the official Python library for interacting with the OpenLineage standard. It allows users to emit lineage metadata events from Python code to an OpenLineage backend (like Marquez) for data governance and observability. It is actively maintained with frequent releases, currently at version 1.45.0, and forms the basis for various integrations like Airflow and dbt.
Warnings
- gotcha The OpenLineage client can be configured via `openlineage.yml` file (searched in `OPENLINEAGE_CONFIG` env var, CWD, or `$HOME/.openlineage`) or directly via environment variables like `OPENLINEAGE_URL` and `OPENLINEAGE_API_KEY`. Environment variables typically override config file settings for HTTP transport.
- gotcha When using `openlineage-python` with the `apache-airflow-providers-openlineage`, it's crucial to understand their roles. The Python client (`openlineage-python`) handles event transmission, while the Airflow provider extracts Airflow-specific metadata. Both should be kept updated independently, as the client has no Airflow version dependencies.
- gotcha Lineage extraction for generic operators like `PythonOperator` or `KubernetesPodOperator` in Airflow might be limited due to their 'black box' nature. Full input/output dataset metadata may not be automatically captured.
- breaking Support for Spark 2.x versions was dropped in `openlineage-python` version 1.38.0. The minimum supported Spark version is now 3.x.
- gotcha The `KafkaTransport` will fail to initialize if the `confluent-kafka` package is not installed. This dependency is part of the `openlineage-python[kafka]` extra.
Install
-
pip install openlineage-python -
pip install openlineage-python[fsspec] -
pip install openlineage-python[kafka] -
pip install openlineage-python[msk-iam] -
pip install openlineage-python[datazone]
Imports
- OpenLineageClient
from openlineage.client.client import OpenLineageClient
- RunEvent
from openlineage.client.event_v2 import RunEvent
- RunState
from openlineage.client.event_v2 import RunState
- Job
from openlineage.client.event_v2 import Job
- InputDataset
from openlineage.client.event_v2 import InputDataset
- OutputDataset
from openlineage.client.event_v2 import OutputDataset
Quickstart
import os
from datetime import datetime
import uuid
from openlineage.client.client import OpenLineageClient
from openlineage.client.event_v2 import RunEvent, RunState, Job, InputDataset, OutputDataset, Run
# Configure OpenLineage to send events to the console for demonstration
os.environ['OPENLINEAGE_URL'] = os.environ.get('OPENLINEAGE_URL', 'console') # Use 'console' for local output
os.environ['OPENLINEAGE_NAMESPACE'] = os.environ.get('OPENLINEAGE_NAMESPACE', 'my_app_namespace')
# Initialize the OpenLineage client
client = OpenLineageClient()
def my_data_processing_job():
job_name = "my_simple_job"
run_id = str(uuid.uuid4())
namespace = os.environ['OPENLINEAGE_NAMESPACE']
input_dataset_name = "input_data"
output_dataset_name = "processed_data"
# 1. Emit START event
start_event = RunEvent(
eventType=RunState.START,
eventTime=datetime.now().isoformat(),
run=Run(runId=run_id, facets={}),
job=Job(namespace=namespace, name=job_name, facets={}),
inputs=[InputDataset(namespace=namespace, name=input_dataset_name)],
outputs=[OutputDataset(namespace=namespace, name=output_dataset_name)],
producer=client.producer,
schemaURL=client.schema_url_v2
)
client.emit(start_event)
print(f"Emitted START event for job '{job_name}' with run ID '{run_id}'")
try:
# Simulate data processing
print(f"Processing data for job '{job_name}'...")
# Add actual processing logic here
# 2. Emit COMPLETE event on success
complete_event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=datetime.now().isoformat(),
run=Run(runId=run_id, facets={}),
job=Job(namespace=namespace, name=job_name, facets={}),
inputs=[InputDataset(namespace=namespace, name=input_dataset_name)],
outputs=[OutputDataset(namespace=namespace, name=output_dataset_name)],
producer=client.producer,
schemaURL=client.schema_url_v2
)
client.emit(complete_event)
print(f"Emitted COMPLETE event for job '{job_name}'")
except Exception as e:
print(f"Job '{job_name}' failed: {e}")
# 3. Emit FAIL event on failure
fail_event = RunEvent(
eventType=RunState.FAIL,
eventTime=datetime.now().isoformat(),
run=Run(runId=run_id, facets={}),
job=Job(namespace=namespace, name=job_name, facets={}),
inputs=[InputDataset(namespace=namespace, name=input_dataset_name)],
outputs=[OutputDataset(namespace=namespace, name=output_dataset_name)],
producer=client.producer,
schemaURL=client.schema_url_v2
)
client.emit(fail_event)
print(f"Emitted FAIL event for job '{job_name}'")
if __name__ == "__main__":
my_data_processing_job()