OpenLineage Python Client
OpenLineage Python Client is the official Python library for interacting with the OpenLineage standard. It allows users to emit lineage metadata events from Python code to an OpenLineage backend (like Marquez) for data governance and observability. It is actively maintained with frequent releases, currently at version 1.45.0, and forms the basis for various integrations like Airflow and dbt.
Common errors
-
scheduler shuts down after the attempt to pickle OpenLineageListener initializer fails
cause The `openlineage-airflow` package is deprecated and incompatible with Apache Airflow versions 2.7.0 and later.fixFor Airflow 2.7.0+, use the native `apache-airflow-providers-openlineage` package instead of `openlineage-airflow`. -
Program 'dbt-ol' failed to run: No application is associated with the specified file for this operation
cause The `dbt-ol` executable, installed by `openlineage-dbt`, is not found in the system's PATH, or the installation was incomplete/corrupted.fixEnsure `openlineage-dbt` is correctly installed and its executables are accessible in your environment, typically by activating a virtual environment. Re-install using `pip install openlineage-dbt` if necessary. -
ConnectionError: Max retries exceeded with URL
cause The OpenLineage client cannot establish a connection to the configured OpenLineage backend URL, often due to an incorrect `OPENLINEAGE_URL` environment variable, the backend not running, or network/firewall issues.fixVerify that the `OPENLINEAGE_URL` environment variable or `openlineage.yml` configuration points to a running and accessible OpenLineage backend (e.g., Marquez). Check network connectivity and firewall settings. -
AttributeError: module 'openlineage.client' has no attribute 'Job'
cause Core OpenLineage data structures like `Job`, `Run`, `Dataset`, and `RunEvent` are located in specific submodules (e.g., `openlineage.client.event_v2` or `openlineage.client.run`), not directly under the top-level `openlineage.client` module.fixImport the classes from their specific submodules, for example: `from openlineage.client.event_v2 import Job, Run, Dataset, RunEvent`.
Warnings
- gotcha The OpenLineage client can be configured via `openlineage.yml` file (searched in `OPENLINEAGE_CONFIG` env var, CWD, or `$HOME/.openlineage`) or directly via environment variables like `OPENLINEAGE_URL` and `OPENLINEAGE_API_KEY`. Environment variables typically override config file settings for HTTP transport.
- gotcha When using `openlineage-python` with the `apache-airflow-providers-openlineage`, it's crucial to understand their roles. The Python client (`openlineage-python`) handles event transmission, while the Airflow provider extracts Airflow-specific metadata. Both should be kept updated independently, as the client has no Airflow version dependencies.
- gotcha Lineage extraction for generic operators like `PythonOperator` or `KubernetesPodOperator` in Airflow might be limited due to their 'black box' nature. Full input/output dataset metadata may not be automatically captured.
- breaking Support for Spark 2.x versions was dropped in `openlineage-python` version 1.38.0. The minimum supported Spark version is now 3.x.
- gotcha The `KafkaTransport` will fail to initialize if the `confluent-kafka` package is not installed. This dependency is part of the `openlineage-python[kafka]` extra.
Install
-
pip install openlineage-python -
pip install openlineage-python[fsspec] -
pip install openlineage-python[kafka] -
pip install openlineage-python[msk-iam] -
pip install openlineage-python[datazone]
Imports
- OpenLineageClient
from openlineage.client.client import OpenLineageClient
- RunEvent
from openlineage.client.event import RunEvent
from openlineage.client.event_v2 import RunEvent
- RunState
from openlineage.client.event_v2 import RunState
- Job
from openlineage.client.event_v2 import Job
- InputDataset
from openlineage.client.event_v2 import InputDataset
- OutputDataset
from openlineage.client.event_v2 import OutputDataset
Quickstart
import os
from datetime import datetime
import uuid
from openlineage.client.client import OpenLineageClient
from openlineage.client.event_v2 import RunEvent, RunState, Job, InputDataset, OutputDataset, Run
# Configure OpenLineage to send events to the console for demonstration
os.environ['OPENLINEAGE_URL'] = os.environ.get('OPENLINEAGE_URL', 'console') # Use 'console' for local output
os.environ['OPENLINEAGE_NAMESPACE'] = os.environ.get('OPENLINEAGE_NAMESPACE', 'my_app_namespace')
# Initialize the OpenLineage client
client = OpenLineageClient()
def my_data_processing_job():
job_name = "my_simple_job"
run_id = str(uuid.uuid4())
namespace = os.environ['OPENLINEAGE_NAMESPACE']
input_dataset_name = "input_data"
output_dataset_name = "processed_data"
# 1. Emit START event
start_event = RunEvent(
eventType=RunState.START,
eventTime=datetime.now().isoformat(),
run=Run(runId=run_id, facets={}),
job=Job(namespace=namespace, name=job_name, facets={}),
inputs=[InputDataset(namespace=namespace, name=input_dataset_name)],
outputs=[OutputDataset(namespace=namespace, name=output_dataset_name)],
producer=client.producer,
schemaURL=client.schema_url_v2
)
client.emit(start_event)
print(f"Emitted START event for job '{job_name}' with run ID '{run_id}'")
try:
# Simulate data processing
print(f"Processing data for job '{job_name}'...")
# Add actual processing logic here
# 2. Emit COMPLETE event on success
complete_event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=datetime.now().isoformat(),
run=Run(runId=run_id, facets={}),
job=Job(namespace=namespace, name=job_name, facets={}),
inputs=[InputDataset(namespace=namespace, name=input_dataset_name)],
outputs=[OutputDataset(namespace=namespace, name=output_dataset_name)],
producer=client.producer,
schemaURL=client.schema_url_v2
)
client.emit(complete_event)
print(f"Emitted COMPLETE event for job '{job_name}'")
except Exception as e:
print(f"Job '{job_name}' failed: {e}")
# 3. Emit FAIL event on failure
fail_event = RunEvent(
eventType=RunState.FAIL,
eventTime=datetime.now().isoformat(),
run=Run(runId=run_id, facets={}),
job=Job(namespace=namespace, name=job_name, facets={}),
inputs=[InputDataset(namespace=namespace, name=input_dataset_name)],
outputs=[OutputDataset(namespace=namespace, name=output_dataset_name)],
producer=client.producer,
schemaURL=client.schema_url_v2
)
client.emit(fail_event)
print(f"Emitted FAIL event for job '{job_name}'")
if __name__ == "__main__":
my_data_processing_job()