Dagster Shell
dagster-shell is a Python package within the Dagster ecosystem, providing functionality to execute shell commands and scripts as operations within Dagster workflows. While the library itself is at version 0.25.13, its core functionality for creating shell ops has been deprecated in favor of a newer approach within the main `dagster` package. The broader Dagster platform maintains an active release cadence, with minor versions (potentially including breaking changes) approximately every 12 weeks and weekly patch releases for deprecations.
Common errors
-
ImportError: cannot import name 'create_shell_command_op' from 'dagster_shell'
cause Attempting to import deprecated functions `create_shell_command_op` or `create_shell_script_op` from the `dagster-shell` library after Dagster version 1.10. These functions have been removed.fixUpdate your code to use `PipesSubprocessClient` from the `dagster` package instead. The import should be `from dagster import PipesSubprocessClient`, and usage will involve configuring it as a resource and calling its `run` method. -
RuntimeError: preexec_fn is not supported on Windows platforms
cause The underlying `subprocess` call used by `dagster-shell` attempts to use `preexec_fn`, a Unix-specific feature, on a Windows operating system.fixRun your Dagster instance and code on a Linux-based environment or container. If a Windows environment is unavoidable, you may need to find alternative ways to execute shell commands that do not rely on `preexec_fn` or implement custom execution logic. -
Shell command execution failed with output: [output_logs] and return code: 127 (or other non-zero code), typically for binary executables or complex commands.
cause The shell command or binary executable could not be found, or its arguments were misinterpreted, possibly due to encoding issues or incorrect paths. Return code 127 usually indicates 'command not found'.fixVerify that the executable or script is in the system's PATH or provide its absolute path. Ensure complex arguments are properly quoted. If the command involves a binary, consider wrapping it in a simple bash script to explicitly control execution and argument passing.
Warnings
- breaking The `create_shell_command_op` and `create_shell_script_op` functions from `dagster-shell` have been deprecated in Dagster 1.10+ and should no longer be used.
- gotcha When specifying environment variables for shell commands, older versions of `dagster-shell` (e.g., pre-1.0 and some 0.15.x versions) might *override* the entire environment instead of merging with the parent process's environment.
- gotcha `dagster-shell` (and underlying Python `subprocess` calls) may encounter issues on Windows due to `preexec_fn` not being supported. This can manifest as `RuntimeError` or unexpected behavior.
- gotcha Executing binary files or commands with complex arguments via `dagster-shell` might lead to 'not-found' errors or unexpected behavior if UTF-8 encoding is implicitly applied during command interpretation.
Install
-
pip install dagster-shell
Imports
- PipesSubprocessClient
from dagster_shell import create_shell_command_op
from dagster import PipesSubprocessClient
Quickstart
import dagster as dg
import os
@dg.asset
def run_shell_command_asset(context: dg.AssetExecutionContext, pipes_subprocess_client: dg.PipesSubprocessClient):
# Example: Run a simple echo command
# The command is executed by PipesSubprocessClient, which pipes logs back to Dagster.
command = ["bash", "-c", "echo 'Hello from Dagster Pipes!' && sleep 1 && echo 'Done!'"]
# For a shell script file, you'd specify its path:
# shell_script_path = "./my_script.sh"
# with open(shell_script_path, "w") as f:
# f.write("#!/bin/bash\nset -eux\necho 'Executing my_script.sh'\n")
# command = ["bash", shell_script_path]
# Run the command and get results. PipesSubprocessClient streams logs and events.
result = pipes_subprocess_client.run(command=command, context=context).get_results()
# The result object contains information about the subprocess execution, e.g., return code
context.log.info(f"Shell command completed with exit code: {result.return_code}")
definitions = dg.Definitions(
assets=[run_shell_command_asset],
resources={
"pipes_subprocess_client": dg.PipesSubprocessClient(),
# pipes_subprocess_client can be configured, e.g., to pass environment variables
# "pipes_subprocess_client": dg.PipesSubprocessClient(env={'MY_VAR': 'my_value'})
},
)