Dagster Pipes

1.12.22 · active · verified Thu Apr 09

dagster-pipes (version 1.12.22) is a toolkit for integrating external transform logic with Dagster, enabling communication between arbitrary external processes and the Dagster orchestration layer. It allows external programs to report events like asset materializations, observations, and logs back to Dagster via a lightweight, message-based protocol. The library is released in lockstep with Dagster core, meaning new versions typically coincide with major Dagster releases.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to define an `PipesExternalAsset` in Dagster that executes a Python script in an external process. The external script uses `PipesClient` to send an `AssetMaterialization` event back to Dagster, illustrating the core communication pattern. The `PipesExternalAsset` handles passing the necessary context to the external process.

import os
from dagster import Definitions
from dagster_pipes import PipesExternalAsset, PipesClient, get_dagster_pipes_context

# --- external_process.py ---
# This script would be run by PipesExternalAsset

# In a real scenario, this would be a separate file, e.g., 'external_process.py'
EXTERNAL_PROCESS_SCRIPT_CONTENT = """
import os
from dagster_pipes import PipesClient, get_dagster_pipes_context

def main():
    context = get_dagster_pipes_context()
    client = PipesClient(context.dagster_pipes_rpc_args)

    # Simulate some external work
    print(f"Running external process for asset: {os.environ.get('ASSET_KEY')}")

    # Report an asset materialization back to Dagster
    client.report_asset_materialization(
        asset_key=os.environ["ASSET_KEY"],
        metadata={
            "num_rows": 42,
            "source_path": "s3://my-bucket/data.csv",
        },
        partition_key=os.environ.get("DAGSTER_PARTITION_KEY"),
    )
    client.report_logs("INFO", "External process completed successfully.")

if __name__ == "__main__":
    main()
"""

# Create the external script file for demonstration
with open("external_process.py", "w") as f:
    f.write(EXTERNAL_PROCESS_SCRIPT_CONTENT)

# --- definitions.py ---

# Define a Dagster asset that delegates its execution to an external process
my_external_asset = PipesExternalAsset(
    asset_key="my_external_data",
    command=["python", "external_process.py"],
    # PipesExternalAsset automatically injects environment variables
    # like ASSET_KEY and DAGSTER_PARTITION_KEY, and sets up communication
)

defs = Definitions(assets=[my_external_asset])

# To run this in a Dagster environment:
# 1. Save the Python code above as, e.g., 'my_project.py'
# 2. Run 'dagster dev -f my_project.py'
# 3. In the Dagster UI, materialize 'my_external_data'.
#    The 'external_process.py' script will be executed and report its status.

view raw JSON →