Dagster Pipes
dagster-pipes (version 1.12.22) is a toolkit for integrating external transform logic with Dagster, enabling communication between arbitrary external processes and the Dagster orchestration layer. It allows external programs to report events like asset materializations, observations, and logs back to Dagster via a lightweight, message-based protocol. The library is released in lockstep with Dagster core, meaning new versions typically coincide with major Dagster releases.
Warnings
- breaking dagster-pipes versions must match your dagster core package version. Mismatched versions can lead to serialization errors, API incompatibilities, or unexpected behavior.
- gotcha The external process invoked by `PipesExternalAsset` runs in its own environment. If your external logic (e.g., a Python script) uses `PipesClient`, then `dagster-pipes` must be installed and accessible within that external environment as well.
- gotcha The `PipesClient` in the external process requires a `PipesContext` to establish communication with Dagster. This context is automatically provided by `PipesExternalAsset` via environment variables or stdin.
- gotcha dagster-pipes is designed for event emission (materializations, observations, logs) and metadata reporting, not for direct, synchronous data transfer between the external process and Dagster's core execution or other assets.
Install
-
pip install dagster-pipes
Imports
- PipesExternalAsset
from dagster_pipes import PipesExternalAsset
- PipesClient
from dagster_pipes import PipesClient
- get_dagster_pipes_context
from dagster_pipes import get_dagster_pipes_context
Quickstart
import os
from dagster import Definitions
from dagster_pipes import PipesExternalAsset, PipesClient, get_dagster_pipes_context
# --- external_process.py ---
# This script would be run by PipesExternalAsset
# In a real scenario, this would be a separate file, e.g., 'external_process.py'
EXTERNAL_PROCESS_SCRIPT_CONTENT = """
import os
from dagster_pipes import PipesClient, get_dagster_pipes_context
def main():
context = get_dagster_pipes_context()
client = PipesClient(context.dagster_pipes_rpc_args)
# Simulate some external work
print(f"Running external process for asset: {os.environ.get('ASSET_KEY')}")
# Report an asset materialization back to Dagster
client.report_asset_materialization(
asset_key=os.environ["ASSET_KEY"],
metadata={
"num_rows": 42,
"source_path": "s3://my-bucket/data.csv",
},
partition_key=os.environ.get("DAGSTER_PARTITION_KEY"),
)
client.report_logs("INFO", "External process completed successfully.")
if __name__ == "__main__":
main()
"""
# Create the external script file for demonstration
with open("external_process.py", "w") as f:
f.write(EXTERNAL_PROCESS_SCRIPT_CONTENT)
# --- definitions.py ---
# Define a Dagster asset that delegates its execution to an external process
my_external_asset = PipesExternalAsset(
asset_key="my_external_data",
command=["python", "external_process.py"],
# PipesExternalAsset automatically injects environment variables
# like ASSET_KEY and DAGSTER_PARTITION_KEY, and sets up communication
)
defs = Definitions(assets=[my_external_asset])
# To run this in a Dagster environment:
# 1. Save the Python code above as, e.g., 'my_project.py'
# 2. Run 'dagster dev -f my_project.py'
# 3. In the Dagster UI, materialize 'my_external_data'.
# The 'external_process.py' script will be executed and report its status.