OpenLineage Python Client

1.45.0 · active · verified Sun Apr 05

OpenLineage Python Client is the official Python library for interacting with the OpenLineage standard. It allows users to emit lineage metadata events from Python code to an OpenLineage backend (like Marquez) for data governance and observability. It is actively maintained with frequent releases, currently at version 1.45.0, and forms the basis for various integrations like Airflow and dbt.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to initialize the `OpenLineageClient` and manually emit `START` and `COMPLETE` (or `FAIL`) events for a data processing job. It sets the `OPENLINEAGE_URL` to 'console' to print events directly to standard output, making it easy to see the generated lineage without a full OpenLineage backend.

import os
from datetime import datetime
import uuid

from openlineage.client.client import OpenLineageClient
from openlineage.client.event_v2 import RunEvent, RunState, Job, InputDataset, OutputDataset, Run

# Configure OpenLineage to send events to the console for demonstration
os.environ['OPENLINEAGE_URL'] = os.environ.get('OPENLINEAGE_URL', 'console') # Use 'console' for local output
os.environ['OPENLINEAGE_NAMESPACE'] = os.environ.get('OPENLINEAGE_NAMESPACE', 'my_app_namespace')

# Initialize the OpenLineage client
client = OpenLineageClient()

def my_data_processing_job():
    job_name = "my_simple_job"
    run_id = str(uuid.uuid4())
    namespace = os.environ['OPENLINEAGE_NAMESPACE']
    
    input_dataset_name = "input_data"
    output_dataset_name = "processed_data"

    # 1. Emit START event
    start_event = RunEvent(
        eventType=RunState.START,
        eventTime=datetime.now().isoformat(),
        run=Run(runId=run_id, facets={}),
        job=Job(namespace=namespace, name=job_name, facets={}),
        inputs=[InputDataset(namespace=namespace, name=input_dataset_name)],
        outputs=[OutputDataset(namespace=namespace, name=output_dataset_name)],
        producer=client.producer,
        schemaURL=client.schema_url_v2
    )
    client.emit(start_event)
    print(f"Emitted START event for job '{job_name}' with run ID '{run_id}'")

    try:
        # Simulate data processing
        print(f"Processing data for job '{job_name}'...")
        # Add actual processing logic here

        # 2. Emit COMPLETE event on success
        complete_event = RunEvent(
            eventType=RunState.COMPLETE,
            eventTime=datetime.now().isoformat(),
            run=Run(runId=run_id, facets={}),
            job=Job(namespace=namespace, name=job_name, facets={}),
            inputs=[InputDataset(namespace=namespace, name=input_dataset_name)],
            outputs=[OutputDataset(namespace=namespace, name=output_dataset_name)],
            producer=client.producer,
            schemaURL=client.schema_url_v2
        )
        client.emit(complete_event)
        print(f"Emitted COMPLETE event for job '{job_name}'")

    except Exception as e:
        print(f"Job '{job_name}' failed: {e}")
        # 3. Emit FAIL event on failure
        fail_event = RunEvent(
            eventType=RunState.FAIL,
            eventTime=datetime.now().isoformat(),
            run=Run(runId=run_id, facets={}),
            job=Job(namespace=namespace, name=job_name, facets={}),
            inputs=[InputDataset(namespace=namespace, name=input_dataset_name)],
            outputs=[OutputDataset(namespace=namespace, name=output_dataset_name)],
            producer=client.producer,
            schemaURL=client.schema_url_v2
        )
        client.emit(fail_event)
        print(f"Emitted FAIL event for job '{job_name}'")

if __name__ == "__main__":
    my_data_processing_job()

view raw JSON →