Prefect Shell
Prefect Shell provides integrations for executing shell commands within Prefect flows. It allows users to embed shell scripts and commands directly into their data pipelines, leveraging Prefect's orchestration capabilities like logging, retries, and observability. This library is part of the Prefect ecosystem, currently at version 0.3.5, and is actively maintained with regular updates as part of the broader Prefect project.
Warnings
- breaking Prefect 2.x changed the core API from Prefect 1.x. `prefect-shell` is designed specifically for Prefect 2 and is not compatible with Prefect 1. Ensure your Prefect core installation is version 2.0 or higher.
- gotcha When using `ShellOperation`, calling `.run()` is suitable for short, synchronous commands. For long-running processes or when needing to manage the subprocess lifecycle (e.g., waiting for completion, capturing output), use the context manager pattern (`with ShellOperation(...)`) combined with `.trigger()` and `.wait_for_completion()`, followed by `.fetch_result()` to get output. Directly calling `.run()` inside a flow will execute synchronously and may block.
- gotcha To use `ShellOperation` as a Prefect block type (e.g., for creating and managing shell commands directly in the Prefect UI), you must explicitly register the blocks after installation.
Install
-
pip install "prefect[shell]" -
pip install prefect-shell
Imports
- ShellOperation
from prefect_shell import ShellOperation
- flow
from prefect import flow
Quickstart
from prefect import flow
from prefect_shell import ShellOperation
@flow
def run_simple_command():
# For short-running operations, use the .run() method
result = ShellOperation(commands=["echo Hello from Prefect Shell!"]).run()
print(f"Command output: {result.stdout.strip()}")
# For longer-running operations or capturing structured output, use a context manager
with ShellOperation(
commands=["ls -l /tmp"],
working_dir="/tmp"
) as list_files_operation:
process = list_files_operation.trigger()
process.wait_for_completion()
output_lines = process.fetch_result()
print("\nFiles in /tmp:")
for line in output_lines:
print(line.strip())
if __name__ == "__main__":
run_simple_command()