Prefect Workflow Orchestration

3.6.25 · active · verified Thu Apr 09

Prefect is an open-source workflow orchestration and management system that allows users to build, run, and monitor data pipelines. It provides a robust framework for defining workflows as Python code, complete with task dependencies, retries, caching, and state management. The current stable version is 3.6.25, with frequent nightly development builds and stable releases typically every few weeks.

Warnings

Install

Imports

Quickstart

This quickstart defines a simple task and a flow that uses it. It then demonstrates how to `serve` the flow, which will start a local Prefect API server and agent, and register the flow as a deployment. This allows you to observe flow runs and their states in the Pref Prefect UI (http://localhost:4200). For connecting to Prefect Cloud, set `PREFECT_API_URL` and `PREFECT_API_KEY` environment variables.

import os
from prefect import flow, task, serve

@task
def greet_task(name: str):
    """A simple task that prints a greeting."""
    print(f"Hello from task, {name}!")
    return f"Task processed: {name}"

@flow(log_prints=True)
def my_orchestrated_flow(name: str = "World"):
    """A flow that uses the greet_task and prints its result."""
    print(f"Flow starting for {name}...")
    task_result = greet_task(name)
    print(f"Flow received result: {task_result}")
    return f"Flow completed for {name}"

if __name__ == "__main__":
    # To connect to Prefect Cloud, set these environment variables:
    # os.environ['PREFECT_API_URL'] = os.environ.get('PREFECT_API_URL', 'https://api.prefect.cloud/api/accounts/.../workspaces/...')
    # os.environ['PREFECT_API_KEY'] = os.environ.get('PREFECT_API_KEY', 'pf_...')

    # To run a flow and see it in the UI, it must be 'served' or deployed.
    # This starts a local Prefect API server and an agent, then registers the deployment.
    # Access the UI at http://localhost:4200
    print("Serving 'my_orchestrated_flow' locally. Open http://localhost:4200 in your browser.")
    serve(my_orchestrated_flow.to_deployment(name="my-first-deployment", interval=10))

    # To simply run a flow locally without orchestration (not recommended for production):
    # my_orchestrated_flow("Local User")

view raw JSON →