Kubeflow Python SDK
The Kubeflow Python SDK (current version 0.4.0) provides a client library to programmatically manage machine learning workloads and interact with various Kubeflow APIs. It allows users to define, create, monitor, and delete training jobs (e.g., PyTorchJob, TFJob), hyperparameter optimization jobs (Katib), and other ML-related resources directly from Python. Releases are frequent, typically focusing on new features and bug fixes across minor versions.
Common errors
-
ModuleNotFoundError: No module named 'kubeflow.sdk'
cause The 'kubeflow' package is not installed or the import path is incorrect.fixInstall the package using `pip install kubeflow`. Ensure your imports are `from kubeflow.sdk.<submodule> import <Symbol>`. -
ApiException: (404) Reason: Not Found
cause The Kubeflow API server could not find the specified resource (e.g., job, namespace) or the kubectl context is incorrectly configured.fixVerify the namespace and job name. Check your `kubectl` context with `kubectl config current-context` and ensure it can access the Kubeflow cluster. Also, verify that the required CRDs (e.g., 'pytorchjobs.kubeflow.org') exist on the cluster. -
AttributeError: 'NoneType' object has no attribute 'metadata'
cause This usually happens when `create_job` or `get_job` returns `None` because the job creation failed or the job could not be retrieved, often due to an underlying API error or malformed job spec.fixAdd error handling around API calls. Inspect the `training_job` definition for correctness. Check the logs of the Kubeflow API server or the relevant operator (e.g., `pytorch-operator`) for more specific errors.
Warnings
- breaking The `PodTemplateOverrides` API for custom pod modifications in training jobs has been replaced by `RuntimePatches` starting from Kubeflow SDK v0.4.0.
- gotcha The Kubeflow SDK client interacts with a remote Kubeflow cluster. Your Python environment needs a configured `kubectl` context (e.g., ~/.kube/config) pointing to a running Kubeflow instance.
- gotcha There can be compatibility issues between the Kubeflow SDK version and the version of Kubeflow deployed on your cluster, especially with CRD (Custom Resource Definition) versions for training operators (e.g., PyTorchJob, TFJob).
- gotcha The `TrainerClient` and `OptimizerClient` are distinct and manage different components of Kubeflow. Ensure you are importing and using the correct client for training jobs (`TrainerClient`) versus hyperparameter optimization jobs (`OptimizerClient`).
Install
-
pip install kubeflow
Imports
- TrainerClient
from kubeflow.sdk.training import TrainerClient
- TrainingJob
from kubeflow.sdk.training.api import TrainingJob
- V1PyTorchJob
from kubeflow.sdk.training.models import V1PyTorchJob
- OptimizerClient
from kubeflow.sdk.training import OptimizerClient
from kubeflow.sdk.optimizer import OptimizerClient
Quickstart
import os
from kubeflow.sdk.training import TrainerClient
from kubeflow.sdk.training.api import TrainingJob
from kubeflow.sdk.training.models import V1PyTorchJob, V1RunPolicy
# NOTE: This example requires a running Kubeflow cluster and configured kubectl context.
# It will create a PyTorch training job in the 'kubeflow' namespace.
# Define your training job
training_job = TrainingJob(
api_version="kubeflow.org/v1",
kind="PyTorchJob",
metadata={
"name": os.environ.get('KF_JOB_NAME', 'my-pytorch-job'),
"namespace": os.environ.get('KF_NAMESPACE', 'kubeflow')
},
spec=V1PyTorchJob(
pytorch_replica_specs={
"Worker": {
"replicas": 1,
"restartPolicy": "OnFailure",
"template": {
"spec": {
"containers": [
{
"name": "pytorch",
"image": "pytorch/pytorch:1.9.0-cuda11.1-cudnn8-runtime",
"command": ["python", "-c", "print('Hello Kubeflow!')"],
}
]
}
},
}
},
run_policy=V1RunPolicy(clean_pod_policy="All"),
),
)
# Initialize the TrainerClient
try:
trainer_client = TrainerClient()
# Create the training job on the Kubeflow cluster
created_job = trainer_client.create_job(job=training_job)
print(f"Job '{created_job.metadata.name}' created in namespace '{created_job.metadata.namespace}'.")
# Wait for job completion (optional, can block)
# trainer_client.wait_for_job_completion(name=created_job.metadata.name, namespace=created_job.metadata.namespace)
# print(f"Job '{created_job.metadata.name}' completed.")
# Get job status (optional)
status = trainer_client.get_job_status(name=created_job.metadata.name, namespace=created_job.metadata.namespace)
print(f"Job status: {status.state}")
# Delete the job (optional, uncomment to enable)
# trainer_client.delete_job(name=created_job.metadata.name, namespace=created_job.metadata.namespace)
# print(f"Job '{created_job.metadata.name}' deleted.")
except Exception as e:
print(f"An error occurred: {e}")
print("Ensure your kubectl context is correctly configured and pointing to a Kubeflow cluster.")