Prefect Kubernetes Integration
Prefect-kubernetes provides integrations for running Prefect flows on Kubernetes. It enables Prefect 2.x deployments to provision Kubernetes Jobs for flow runs using `KubernetesDeployment` and manage them with `KubernetesWorker`. The current version is 0.7.7, and it follows the Prefect core library's release cadence, with integrations typically updated alongside major Prefect releases.
Warnings
- breaking Prefect 1.x `KubernetesAgent` and `KubernetesRunner` are deprecated and replaced by Prefect 2.x `KubernetesWorker` and `KubernetesDeployment`. Migrating from Prefect 1.x to 2.x requires a complete rewrite of your deployment definitions and infrastructure setup for Kubernetes.
- gotcha Understanding the interaction between Prefect 2.x Workers, Work Pools, and Kubernetes Jobs is crucial. A `KubernetesWorker` polls a `KubernetesWorkPool` for flow runs, and for each run, it dynamically creates a Kubernetes Job. This is different from Prefect 1.x 'Agents' which directly ran flow code.
- gotcha Correct Kubernetes authentication and configuration (e.g., `kubeconfig` context or in-cluster service account permissions) are essential. Misconfiguration often leads to `prefect-kubernetes` failing to create or monitor Kubernetes Jobs.
- gotcha The Docker image specified in a `KubernetesDeployment` must contain your flow's code and all its Python dependencies. If the image is private, Kubernetes also needs image pull secrets configured.
Install
-
pip install prefect-kubernetes
Imports
- KubernetesDeployment
from prefect_kubernetes.deployments import KubernetesDeployment
- KubernetesWorker
from prefect_kubernetes.workers import KubernetesWorker
Quickstart
import os
from prefect import flow
from prefect_kubernetes.deployments import KubernetesDeployment
# 1. Define a simple Prefect Flow
@flow(log_prints=True)
def hello_kubernetes_flow(name: str = "world"):
import platform
print(f"Hello, {name} from Kubernetes! Running on {platform.node()}")
# 2. Create a Kubernetes Deployment definition
# This deployment will tell Prefect how to run `hello_kubernetes_flow` on Kubernetes.
# It requires a work pool named "kubernetes-work-pool" to exist in your Prefect server.
# Create it with: `prefect work-pool create kubernetes-work-pool --type kubernetes`
# IMPORTANT: The 'image' specified here must contain your flow code and Prefect installed.
# For a real scenario, you would build a custom Docker image for your flow.
# Example custom image setup:
# Dockerfile:
# FROM prefecthq/prefect:2-python3.10
# COPY quickstart_kubernetes.py /app/quickstart_kubernetes.py
# WORKDIR /app
# Build & Push:
# docker build -t your-registry/your-flow-image:latest .
# docker push your-registry/your-flow-image:latest
# Then use `your-registry/your-flow-image:latest` below.
try:
deployment = KubernetesDeployment(
name="hello-kubernetes-flow-deployment",
description="Runs a simple 'Hello, Kubernetes!' flow on Kubernetes.",
flow_name=hello_kubernetes_flow.name,
work_pool_name="kubernetes-work-pool", # This work pool MUST exist in your Prefect server
image="prefecthq/prefect:2-python3.10", # Base Prefect image; replace for custom flows
entrypoint="quickstart_kubernetes.py:hello_kubernetes_flow", # Assumes this code is saved as quickstart_kubernetes.py
# You can also pass job_variables or infra_overrides for custom Kubernetes Job spec
# job_variables={"MY_ENV_VAR": "my_value"},
# infra_overrides={
# "job_template": {
# "spec": {"template": {"spec": {"nodeSelector": {"kubernetes.io/hostname": "your-node"}}}}
# }
# }
)
deployment_id = deployment.apply()
print(f"Deployment '{deployment.name}' created/updated with ID: {deployment_id}")
except Exception as e:
print(f"Failed to create Kubernetes Deployment: {e}")
print("Ensure PREFECT_API_URL is set and Prefect server is running.")
# To start the Kubernetes Worker that will pick up runs for this deployment:
# prefect worker start --pool kubernetes-work-pool