Kubeflow Python SDK

0.4.0 · active · verified Fri Apr 17

The Kubeflow Python SDK (current version 0.4.0) provides a client library to programmatically manage machine learning workloads and interact with various Kubeflow APIs. It allows users to define, create, monitor, and delete training jobs (e.g., PyTorchJob, TFJob), hyperparameter optimization jobs (Katib), and other ML-related resources directly from Python. Releases are frequent, typically focusing on new features and bug fixes across minor versions.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to create a simple PyTorch training job using the Kubeflow SDK's `TrainerClient`. It defines a `PyTorchJob` spec and submits it to a Kubeflow cluster. This code requires an active Kubeflow deployment and a properly configured `kubectl` context to run successfully.

import os
from kubeflow.sdk.training import TrainerClient
from kubeflow.sdk.training.api import TrainingJob
from kubeflow.sdk.training.models import V1PyTorchJob, V1RunPolicy

# NOTE: This example requires a running Kubeflow cluster and configured kubectl context.
# It will create a PyTorch training job in the 'kubeflow' namespace.
# Define your training job
training_job = TrainingJob(
    api_version="kubeflow.org/v1",
    kind="PyTorchJob",
    metadata={
        "name": os.environ.get('KF_JOB_NAME', 'my-pytorch-job'), 
        "namespace": os.environ.get('KF_NAMESPACE', 'kubeflow')
    },
    spec=V1PyTorchJob(
        pytorch_replica_specs={
            "Worker": {
                "replicas": 1,
                "restartPolicy": "OnFailure",
                "template": {
                    "spec": {
                        "containers": [
                            {
                                "name": "pytorch",
                                "image": "pytorch/pytorch:1.9.0-cuda11.1-cudnn8-runtime",
                                "command": ["python", "-c", "print('Hello Kubeflow!')"],
                            }
                        ]
                    }
                },
            }
        },
        run_policy=V1RunPolicy(clean_pod_policy="All"),
    ),
)

# Initialize the TrainerClient
try:
    trainer_client = TrainerClient()

    # Create the training job on the Kubeflow cluster
    created_job = trainer_client.create_job(job=training_job)
    print(f"Job '{created_job.metadata.name}' created in namespace '{created_job.metadata.namespace}'.")

    # Wait for job completion (optional, can block)
    # trainer_client.wait_for_job_completion(name=created_job.metadata.name, namespace=created_job.metadata.namespace)
    # print(f"Job '{created_job.metadata.name}' completed.")

    # Get job status (optional)
    status = trainer_client.get_job_status(name=created_job.metadata.name, namespace=created_job.metadata.namespace)
    print(f"Job status: {status.state}")

    # Delete the job (optional, uncomment to enable)
    # trainer_client.delete_job(name=created_job.metadata.name, namespace=created_job.metadata.namespace)
    # print(f"Job '{created_job.metadata.name}' deleted.")

except Exception as e:
    print(f"An error occurred: {e}")
    print("Ensure your kubectl context is correctly configured and pointing to a Kubeflow cluster.")

view raw JSON →