Dagster Shell

0.25.13 · active · verified Thu Apr 16

dagster-shell is a Python package within the Dagster ecosystem, providing functionality to execute shell commands and scripts as operations within Dagster workflows. While the library itself is at version 0.25.13, its core functionality for creating shell ops has been deprecated in favor of a newer approach within the main `dagster` package. The broader Dagster platform maintains an active release cadence, with minor versions (potentially including breaking changes) approximately every 12 weeks and weekly patch releases for deprecations.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to use `PipesSubprocessClient` from the main `dagster` package to execute a shell command as a Dagster asset. This is the recommended approach as the direct shell ops in `dagster-shell` are deprecated. The `PipesSubprocessClient` handles streaming logs and events from the shell process back to Dagster.

import dagster as dg
import os

@dg.asset
def run_shell_command_asset(context: dg.AssetExecutionContext, pipes_subprocess_client: dg.PipesSubprocessClient):
    # Example: Run a simple echo command
    # The command is executed by PipesSubprocessClient, which pipes logs back to Dagster.
    command = ["bash", "-c", "echo 'Hello from Dagster Pipes!' && sleep 1 && echo 'Done!'"]
    
    # For a shell script file, you'd specify its path:
    # shell_script_path = "./my_script.sh"
    # with open(shell_script_path, "w") as f:
    #     f.write("#!/bin/bash\nset -eux\necho 'Executing my_script.sh'\n")
    # command = ["bash", shell_script_path]

    # Run the command and get results. PipesSubprocessClient streams logs and events.
    result = pipes_subprocess_client.run(command=command, context=context).get_results()
    
    # The result object contains information about the subprocess execution, e.g., return code
    context.log.info(f"Shell command completed with exit code: {result.return_code}")

definitions = dg.Definitions(
    assets=[run_shell_command_asset],
    resources={
        "pipes_subprocess_client": dg.PipesSubprocessClient(),
        # pipes_subprocess_client can be configured, e.g., to pass environment variables
        # "pipes_subprocess_client": dg.PipesSubprocessClient(env={'MY_VAR': 'my_value'})
    },
)

view raw JSON →