Flytekit Pod Plugin

raw JSON →
1.16.19 verified Fri May 01 auth: no python

Flytekit plugin enabling K8s Pod tasks in Flyte workflows. Allows users to define custom Pod specifications for running tasks on Kubernetes, providing fine-grained control over resources, sidecars, and scheduling. Current version: 1.16.19. Release cadence is tied to Flytekit releases.

pip install flytekitplugins-pod
error ImportError: cannot import name 'PodTemplate' from 'flytekit.extras.pod'
cause Breaking change in Flytekit 1.0: PodTemplate moved to a separate plugin package.
fix
Run 'pip install flytekitplugins-pod' and import from 'flytekitplugins.pod'.
error FlyteExecutionError: Runtime error: Pod failed with status: Container 'my-container' not found in PodSpec.
cause The 'primary_container_name' in PodTemplate does not match any container name in the PodSpec's containers list.
fix
Check that 'primary_container_name' matches a container name in the pod spec.
breaking The 'PodTemplate' class was moved to 'flytekitplugins.pod' in Flytekit 1.0. Importing from the old path 'flytekit.extras.pod' will raise an ImportError.
fix Use 'from flytekitplugins.pod import PodTemplate'.
deprecated Using 'V1PodSpec' directly is supported but some fields like 'node_selector' are deprecated in favor of 'affinity' or 'tolerations' in Kubernetes 1.24+. The plugin may still accept them but they no longer take effect.
fix Review Kubernetes API deprecations and update PodSpec accordingly.
gotcha The 'primary_container_name' field in PodTemplate must match a container name in the PodSpec. If they mismatch, the task execution will fail with a generic error.
fix Ensure 'primary_container_name' matches a 'name' in the 'containers' list.

Define a Flyte task with a custom PodTemplate that sets resource requests/limits.

from flytekit import task, workflow
from flytekitplugins.pod import PodTemplate
from kubernetes.client.models import V1PodSpec, V1Container, V1ResourceRequirements

@task(
    pod_template=PodTemplate(
        primary_container_name="my-container",
        pod_spec=V1PodSpec(
            containers=[
                V1Container(
                    name="my-container",
                    resources=V1ResourceRequirements(
                        requests={"cpu": "1", "memory": "2Gi"},
                        limits={"cpu": "2", "memory": "4Gi"}
                    )
                )
            ]
        )
    )
)
def my_pod_task() -> str:
    return "Hello from Pod task"

@workflow
def wf() -> str:
    return my_pod_task()

if __name__ == "__main__":
    print(wf())