{"id":1815,"library":"dagster-pipes","title":"Dagster Pipes","description":"dagster-pipes (version 1.12.22) is a toolkit for integrating external transform logic with Dagster, enabling communication between arbitrary external processes and the Dagster orchestration layer. It allows external programs to report events like asset materializations, observations, and logs back to Dagster via a lightweight, message-based protocol. The library is released in lockstep with Dagster core, meaning new versions typically coincide with major Dagster releases.","status":"active","version":"1.12.22","language":"en","source_language":"en","source_url":"https://github.com/dagster-io/dagster/tree/master/python_modules/dagster-pipes","tags":["etl","data orchestration","data pipeline","dagster","external process","pipes","integration"],"install":[{"cmd":"pip install dagster-pipes","lang":"bash","label":"Install dagster-pipes"}],"dependencies":[{"reason":"Core Dagster library, required for asset definitions and orchestration.","package":"dagster","optional":false},{"reason":"Provides the base functionality for defining assets managed by external systems, which dagster-pipes builds upon.","package":"dagster-external-assets","optional":false}],"imports":[{"symbol":"PipesExternalAsset","correct":"from dagster_pipes import PipesExternalAsset"},{"symbol":"PipesClient","correct":"from dagster_pipes import PipesClient"},{"symbol":"get_dagster_pipes_context","correct":"from dagster_pipes import get_dagster_pipes_context"}],"quickstart":{"code":"import os\nfrom dagster import Definitions\nfrom dagster_pipes import PipesExternalAsset, PipesClient, get_dagster_pipes_context\n\n# --- external_process.py ---\n# This script would be run by PipesExternalAsset\n\n# In a real scenario, this would be a separate file, e.g., 'external_process.py'\nEXTERNAL_PROCESS_SCRIPT_CONTENT = \"\"\"\nimport os\nfrom dagster_pipes import PipesClient, get_dagster_pipes_context\n\ndef main():\n    context = get_dagster_pipes_context()\n    client = PipesClient(context.dagster_pipes_rpc_args)\n\n    # Simulate some external work\n    print(f\"Running external process for asset: {os.environ.get('ASSET_KEY')}\")\n\n    # Report an asset materialization back to Dagster\n    client.report_asset_materialization(\n        asset_key=os.environ[\"ASSET_KEY\"],\n        metadata={\n            \"num_rows\": 42,\n            \"source_path\": \"s3://my-bucket/data.csv\",\n        },\n        partition_key=os.environ.get(\"DAGSTER_PARTITION_KEY\"),\n    )\n    client.report_logs(\"INFO\", \"External process completed successfully.\")\n\nif __name__ == \"__main__\":\n    main()\n\"\"\"\n\n# Create the external script file for demonstration\nwith open(\"external_process.py\", \"w\") as f:\n    f.write(EXTERNAL_PROCESS_SCRIPT_CONTENT)\n\n# --- definitions.py ---\n\n# Define a Dagster asset that delegates its execution to an external process\nmy_external_asset = PipesExternalAsset(\n    asset_key=\"my_external_data\",\n    command=[\"python\", \"external_process.py\"],\n    # PipesExternalAsset automatically injects environment variables\n    # like ASSET_KEY and DAGSTER_PARTITION_KEY, and sets up communication\n)\n\ndefs = Definitions(assets=[my_external_asset])\n\n# To run this in a Dagster environment:\n# 1. Save the Python code above as, e.g., 'my_project.py'\n# 2. Run 'dagster dev -f my_project.py'\n# 3. In the Dagster UI, materialize 'my_external_data'.\n#    The 'external_process.py' script will be executed and report its status.\n","lang":"python","description":"This quickstart demonstrates how to define an `PipesExternalAsset` in Dagster that executes a Python script in an external process. The external script uses `PipesClient` to send an `AssetMaterialization` event back to Dagster, illustrating the core communication pattern. The `PipesExternalAsset` handles passing the necessary context to the external process."},"warnings":[{"fix":"Always install `dagster-pipes` with the same major.minor.patch version as your `dagster` package (e.g., `dagster==1.12.22` and `dagster-pipes==1.12.22`).","message":"dagster-pipes versions must match your dagster core package version. Mismatched versions can lead to serialization errors, API incompatibilities, or unexpected behavior.","severity":"breaking","affected_versions":"All versions"},{"fix":"Ensure that all necessary dependencies, including `dagster-pipes` if using `PipesClient` in the external code, are available in the execution context of the `command` specified for `PipesExternalAsset`. This often means including it in a Docker image, virtual environment, or `PATH`.","message":"The external process invoked by `PipesExternalAsset` runs in its own environment. If your external logic (e.g., a Python script) uses `PipesClient`, then `dagster-pipes` must be installed and accessible within that external environment as well.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Within your external Python script, always retrieve the context using `from dagster_pipes import get_dagster_pipes_context; context = get_dagster_pipes_context()` before initializing `PipesClient`.","message":"The `PipesClient` in the external process requires a `PipesContext` to establish communication with Dagster. This context is automatically provided by `PipesExternalAsset` via environment variables or stdin.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Use `PipesClient` methods like `report_asset_materialization`, `report_asset_observation`, and `report_logs` to communicate status and metadata about the external work. For actual data transfer between assets or systems, rely on established storage mechanisms (e.g., S3, databases, GCS) that your external process and other Dagster assets can both access.","message":"dagster-pipes is designed for event emission (materializations, observations, logs) and metadata reporting, not for direct, synchronous data transfer between the external process and Dagster's core execution or other assets.","severity":"gotcha","affected_versions":"All versions"}],"env_vars":null,"last_verified":"2026-04-09T00:00:00.000Z","next_check":"2026-07-08T00:00:00.000Z"}