Prefect Workflow Orchestration
Prefect is an open-source workflow orchestration and management system that allows users to build, run, and monitor data pipelines. It provides a robust framework for defining workflows as Python code, complete with task dependencies, retries, caching, and state management. The current stable version is 3.6.25, with frequent nightly development builds and stable releases typically every few weeks.
Warnings
- breaking Prefect 2 (current major version) is a complete rewrite and is NOT backward compatible with Prefect 1. Code written for Prefect 1 will not run on Prefect 2 without significant modification. Key concepts like 'projects' were replaced by 'deployments', and the API changed entirely.
- gotcha Running `my_flow()` executes the flow locally as a plain Python function. To leverage Prefect's orchestration features (retries, scheduling, state tracking, UI visibility), flows must be 'deployed'. This is typically done via `flow.to_deployment()` combined with `serve()` or using the `prefect deploy` CLI.
- gotcha The Prefect UI/API server does not automatically start with `pip install prefect`. For local orchestration and UI access, you must either run `prefect server start` or use `prefect serve()` as demonstrated in the quickstart. If connecting to Prefect Cloud, ensure `PREFECT_API_URL` and `PREFECT_API_KEY` are correctly configured.
- gotcha When modifying a flow's code, you must re-deploy or re-serve the flow for the changes to take effect in your Prefect deployments. Simply saving the Python file is not enough; the updated definition needs to be registered with the Prefect API.
Install
-
pip install prefect
Imports
- flow
from prefect import flow
- task
from prefect import task
- serve
from prefect import serve
- Deployment
from prefect.deployments import Deployment
Quickstart
import os
from prefect import flow, task, serve
@task
def greet_task(name: str):
"""A simple task that prints a greeting."""
print(f"Hello from task, {name}!")
return f"Task processed: {name}"
@flow(log_prints=True)
def my_orchestrated_flow(name: str = "World"):
"""A flow that uses the greet_task and prints its result."""
print(f"Flow starting for {name}...")
task_result = greet_task(name)
print(f"Flow received result: {task_result}")
return f"Flow completed for {name}"
if __name__ == "__main__":
# To connect to Prefect Cloud, set these environment variables:
# os.environ['PREFECT_API_URL'] = os.environ.get('PREFECT_API_URL', 'https://api.prefect.cloud/api/accounts/.../workspaces/...')
# os.environ['PREFECT_API_KEY'] = os.environ.get('PREFECT_API_KEY', 'pf_...')
# To run a flow and see it in the UI, it must be 'served' or deployed.
# This starts a local Prefect API server and an agent, then registers the deployment.
# Access the UI at http://localhost:4200
print("Serving 'my_orchestrated_flow' locally. Open http://localhost:4200 in your browser.")
serve(my_orchestrated_flow.to_deployment(name="my-first-deployment", interval=10))
# To simply run a flow locally without orchestration (not recommended for production):
# my_orchestrated_flow("Local User")